/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012,2013 Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "rdkafka_int.h"
#include "rd.h"
#include "rdfloat.h"

#include <stdlib.h>
#include <ctype.h>
#include <stddef.h>

#include "rdkafka_int.h"
#include "rdkafka_feature.h"
#include "rdkafka_interceptor.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_assignor.h"
#include "rdkafka_sasl_oauthbearer.h"
#if WITH_PLUGINS
#include "rdkafka_plugin.h"
#endif
#include "rdunittest.h"

#ifndef _WIN32
#include <netinet/tcp.h>
#else

#ifndef WIN32_MEAN_AND_LEAN
#define WIN32_MEAN_AND_LEAN
#endif
#include <windows.h>
#endif

struct rd_kafka_property {
	rd_kafka_conf_scope_t scope;
	const char *name;
	enum {
		_RK_C_STR,
		_RK_C_INT,
                _RK_C_DBL,  /* Double */
		_RK_C_S2I,  /* String to Integer mapping.
			     * Supports limited canonical str->int mappings
			     * using s2i[] */
		_RK_C_S2F,  /* CSV String to Integer flag mapping (OR:ed) */
		_RK_C_BOOL,
		_RK_C_PTR,  /* Only settable through special set functions */
                _RK_C_PATLIST, /* Pattern list */
                _RK_C_KSTR, /* Kafka string */
                _RK_C_ALIAS, /* Alias: points to other property through .sdef */
                _RK_C_INTERNAL, /* Internal, don't expose to application */
                _RK_C_INVALID,  /* Invalid property, used to catch known
                                 * but unsupported Java properties. */
	} type;
	int   offset;
	const char *desc;
	int   vmin;
	int   vmax;
	int   vdef;        /* Default value (int) */
	const char *sdef;  /* Default value (string) */
        void  *pdef;       /* Default value (pointer) */
        double ddef;       /* Default value (double) */
        double dmin;
        double dmax;
	struct {
		int val;
		const char *str;
                const char *unsupported; /**< Reason for value not being
                                          *   supported in this build. */
	} s2i[20];  /* _RK_C_S2I and _RK_C_S2F */

        const char *unsupported; /**< Reason for propery not being supported
                                  *   in this build.
                                  *   Will be included in the conf_set()
                                  *   error string. */

	/* Value validator (STR) */
	int (*validate) (const struct rd_kafka_property *prop,
			 const char *val, int ival);

        /* Configuration object constructors and destructor for use when
         * the property value itself is not used, or needs extra care. */
        void (*ctor) (int scope, void *pconf);
        void (*dtor) (int scope, void *pconf);
        void (*copy) (int scope, void *pdst, const void *psrc,
                      void *dstptr, const void *srcptr,
                      size_t filter_cnt, const char **filter);

        rd_kafka_conf_res_t (*set) (int scope, void *pconf,
                                    const char *name, const char *value,
                                    void *dstptr,
                                    rd_kafka_conf_set_mode_t set_mode,
                                    char *errstr, size_t errstr_size);
};


#define _RK(field)  offsetof(rd_kafka_conf_t, field)
#define _RKT(field) offsetof(rd_kafka_topic_conf_t, field)

#if WITH_SSL
#define _UNSUPPORTED_SSL .unsupported = NULL
#else
#define _UNSUPPORTED_SSL .unsupported = "OpenSSL not available at build time"
#endif

#if OPENSSL_VERSION_NUMBER >= 0x1000200fL && defined(WITH_SSL) && !defined(LIBRESSL_VERSION_NUMBER)
#define _UNSUPPORTED_OPENSSL_1_0_2 .unsupported = NULL
#else
#define _UNSUPPORTED_OPENSSL_1_0_2 .unsupported = \
                "OpenSSL >= 1.0.2 not available at build time"
#endif


#if WITH_ZLIB
#define _UNSUPPORTED_ZLIB .unsupported = NULL
#else
#define _UNSUPPORTED_ZLIB .unsupported = "zlib not available at build time"
#endif

#if WITH_SNAPPY
#define _UNSUPPORTED_SNAPPY .unsupported = NULL
#else
#define _UNSUPPORTED_SNAPPY .unsupported = "snappy not enabled at build time"
#endif

#if WITH_ZSTD
#define _UNSUPPORTED_ZSTD .unsupported = NULL
#else
#define _UNSUPPORTED_ZSTD .unsupported = "libzstd not available at build time"
#endif

#ifdef _WIN32
#define _UNSUPPORTED_WIN32_GSSAPI .unsupported =                        \
                "Kerberos keytabs are not supported on Windows, "       \
                "instead the logged on "                                \
                "user's credentials are used through native SSPI"
#else
        #define _UNSUPPORTED_WIN32_GSSAPI .unsupported = NULL
#endif

#if defined(_WIN32) || defined(WITH_SASL_CYRUS)
#define _UNSUPPORTED_GSSAPI .unsupported = NULL
#else
#define _UNSUPPORTED_GSSAPI .unsupported = \
                "cyrus-sasl/libsasl2 not available at build time"
#endif

#define _UNSUPPORTED_OAUTHBEARER _UNSUPPORTED_SSL


static rd_kafka_conf_res_t
rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
                       char *dest, size_t *dest_size);




/**
 * @returns a unique index for property \p prop, using the byte position
 *          of the field.
 */
static RD_INLINE int rd_kafka_prop2idx (const struct rd_kafka_property *prop) {
        return prop->offset;
}



/**
 * @brief Set the property as modified.
 *
 * We do this by mapping the property's conf struct field byte offset
 * to a bit in a bit vector.
 * If the bit is set the property has been modified, otherwise it is
 * at its default unmodified value.
 *
 * \p is_modified 1: set as modified, 0: clear modified
 */
static void rd_kafka_anyconf_set_modified (void *conf,
                                           const struct rd_kafka_property *prop,
                                           int is_modified) {
        int idx = rd_kafka_prop2idx(prop);
        int bkt = idx / 64;
        uint64_t bit = (uint64_t)1 << (idx % 64);
        struct rd_kafka_anyconf_hdr *confhdr = conf;

        rd_assert(idx < RD_KAFKA_CONF_PROPS_IDX_MAX &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");

        if (is_modified)
                confhdr->modified[bkt] |= bit;
        else
                confhdr->modified[bkt] &= ~bit;
}

/**
 * @brief Clear is_modified for all properties.
 * @warning Does NOT clear/reset the value.
 */
static void rd_kafka_anyconf_clear_all_is_modified (void *conf) {
        struct rd_kafka_anyconf_hdr *confhdr = conf;

        memset(confhdr, 0, sizeof(*confhdr));
}


/**
 * @returns true of the property has been set/modified, else false.
 */
static rd_bool_t
rd_kafka_anyconf_is_modified (const void *conf,
                              const struct rd_kafka_property *prop) {
        int idx = rd_kafka_prop2idx(prop);
        int bkt = idx / 64;
        uint64_t bit = (uint64_t)1 << (idx % 64);
        const struct rd_kafka_anyconf_hdr *confhdr = conf;

        return !!(confhdr->modified[bkt] & bit);
}



/**
 * @brief Validate \p broker.version.fallback property.
 */
static int
rd_kafka_conf_validate_broker_version (const struct rd_kafka_property *prop,
				       const char *val, int ival) {
	struct rd_kafka_ApiVersion *apis;
	size_t api_cnt;
	return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL);
}

/**
 * @brief Validate that string is a single item, without delimters (, space).
 */
static RD_UNUSED int
rd_kafka_conf_validate_single (const struct rd_kafka_property *prop,
                               const char *val, int ival) {
	return !strchr(val, ',') && !strchr(val, ' ');
}

/**
 * @brief Validate builtin partitioner string
 */
static RD_UNUSED int
rd_kafka_conf_validate_partitioner (const struct rd_kafka_property *prop,
                                    const char *val, int ival) {
        return !strcmp(val, "random") ||
                !strcmp(val, "consistent") ||
                !strcmp(val, "consistent_random") ||
                !strcmp(val, "murmur2") ||
                !strcmp(val, "murmur2_random") ||
                !strcmp(val, "fnv1a") ||
                !strcmp(val, "fnv1a_random");
}


/**
 * librdkafka configuration property definitions.
 */
static const struct rd_kafka_property rd_kafka_properties[] = {
	/* Global properties */
	{ _RK_GLOBAL, "builtin.features", _RK_C_S2F, _RK(builtin_features),
	"Indicates the builtin features for this build of librdkafka. "
	"An application can either query this value or attempt to set it "
	"with its list of required features to check for library support.",
	0, 0x7fffffff, 0xffff,
          .s2i = {
                        { 0x1, "gzip", _UNSUPPORTED_ZLIB },
                        { 0x2, "snappy", _UNSUPPORTED_SNAPPY },
                        { 0x4, "ssl", _UNSUPPORTED_SSL },
                        { 0x8, "sasl" },
                        { 0x10, "regex" },
                        { 0x20, "lz4" },
                        { 0x40, "sasl_gssapi", _UNSUPPORTED_GSSAPI },
                        { 0x80, "sasl_plain" },
                        { 0x100, "sasl_scram", _UNSUPPORTED_SSL },
                        { 0x200, "plugins"
#if !WITH_PLUGINS
                          , .unsupported = "libdl/dlopen(3) not available at "
                          "build time"
#endif
                        },
                        { 0x400, "zstd", _UNSUPPORTED_ZSTD },
                        { 0x800, "sasl_oauthbearer", _UNSUPPORTED_SSL },
                        { 0, NULL }
                }
	},
	{ _RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
	  "Client identifier.",
	  .sdef =  "rdkafka" },
        { _RK_GLOBAL|_RK_HIDDEN, "client.software.name", _RK_C_STR,
          _RK(sw_name),
          "Client software name as reported to broker version >= v2.4.0. "
          "Broker-side character restrictions apply, as of broker version "
          "v2.4.0 the allowed characters are `a-zA-Z0-9.-`. The local client "
          "will replace any other character with `-` and strip leading and "
          "trailing non-alphanumeric characters before tranmission to "
          "the broker. "
          "This property should only be set by high-level language "
          "librdkafka client bindings.",
          .sdef = "librdkafka"
        },
        { _RK_GLOBAL|_RK_HIDDEN, "client.software.version", _RK_C_STR,
          _RK(sw_version),
          "Client software version as reported to broker version >= v2.4.0. "
          "Broker-side character restrictions apply, as of broker version "
          "v2.4.0 the allowed characters are `a-zA-Z0-9.-`. The local client "
          "will replace any other character with `-` and strip leading and "
          "trailing non-alphanumeric characters before tranmission to "
          "the broker. "
          "This property should only be set by high-level language "
          "librdkafka client bindings."
          "If changing this property it is highly recommended to append the "
          "librdkafka version.",
        },
	{ _RK_GLOBAL|_RK_HIGH, "metadata.broker.list", _RK_C_STR,
          _RK(brokerlist),
	  "Initial list of brokers as a CSV list of broker host or host:port. "
	  "The application may also use `rd_kafka_brokers_add()` to add "
	  "brokers during runtime." },
	{ _RK_GLOBAL|_RK_HIGH, "bootstrap.servers", _RK_C_ALIAS, 0,
	  "See metadata.broker.list",
	  .sdef = "metadata.broker.list" },
        { _RK_GLOBAL|_RK_MED, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
          "Maximum Kafka protocol request message size. "
          "Due to differing framing overhead between protocol versions the "
          "producer is unable to reliably enforce a strict max message limit "
          "at produce time and may exceed the maximum size by one message in "
          "protocol ProduceRequests, the broker will enforce the the topic's "
          "`max.message.bytes` limit (see Apache Kafka documentation).",
          1000, 1000000000, 1000000 },
	{ _RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT,
	  _RK(msg_copy_max_size),
	  "Maximum size for message to be copied to buffer. "
	  "Messages larger than this will be passed by reference (zero-copy) "
	  "at the expense of larger iovecs.",
	  0, 1000000000, 0xffff },
	{ _RK_GLOBAL|_RK_MED, "receive.message.max.bytes", _RK_C_INT,
          _RK(recv_max_msg_size),
          "Maximum Kafka protocol response message size. "
          "This serves as a safety precaution to avoid memory exhaustion in "
          "case of protocol hickups. "
          "This value must be at least `fetch.max.bytes`  + 512 to allow "
          "for protocol overhead; the value is adjusted automatically "
          "unless the configuration property is explicitly set.",
	  1000, INT_MAX, 100000000 },
	{ _RK_GLOBAL, "max.in.flight.requests.per.connection", _RK_C_INT,
	  _RK(max_inflight),
	  "Maximum number of in-flight requests per broker connection. "
	  "This is a generic property applied to all broker communication, "
	  "however it is primarily relevant to produce requests. "
	  "In particular, note that other mechanisms limit the number "
	  "of outstanding consumer fetch request per broker to one.",
	  1, 1000000, 1000000 },
        { _RK_GLOBAL, "max.in.flight", _RK_C_ALIAS,
          .sdef = "max.in.flight.requests.per.connection" },
	{ _RK_GLOBAL, "metadata.request.timeout.ms", _RK_C_INT,
	  _RK(metadata_request_timeout_ms),
	  "Non-topic request timeout in milliseconds. "
	  "This is for metadata requests, etc.",
	  10, 900*1000, 60*1000},
        { _RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
          _RK(metadata_refresh_interval_ms),
          "Period of time in milliseconds at which topic and broker "
          "metadata is refreshed in order to proactively discover any new "
          "brokers, topics, partitions or partition leader changes. "
          "Use -1 to disable the intervalled refresh (not recommended). "
          "If there are no locally referenced topics "
          "(no topic objects created, no messages produced, "
          "no subscription or no assignment) then only the broker list will "
          "be refreshed every interval but no more often than every 10s.",
          -1, 3600*1000, 5*60*1000 },
	{ _RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT,
          _RK(metadata_max_age_ms),
          "Metadata cache max age. "
          "Defaults to topic.metadata.refresh.interval.ms * 3",
          1, 24*3600*1000, 5*60*1000 * 3 },
        { _RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
          _RK(metadata_refresh_fast_interval_ms),
          "When a topic loses its leader a new metadata request will be "
          "enqueued with this initial interval, exponentially increasing "
          "until the topic metadata has been refreshed. "
          "This is used to recover quickly from transitioning leader brokers.",
          1, 60*1000, 250 },
        { _RK_GLOBAL|_RK_DEPRECATED,
          "topic.metadata.refresh.fast.cnt", _RK_C_INT,
          _RK(metadata_refresh_fast_cnt),
          "No longer used.",
          0, 1000, 10 },
        { _RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
          _RK(metadata_refresh_sparse),
          "Sparse metadata requests (consumes less network bandwidth)",
          0, 1, 1 },
        { _RK_GLOBAL, "topic.metadata.propagation.max.ms", _RK_C_INT,
          _RK(metadata_propagation_max_ms),
          "Apache Kafka topic creation is asynchronous and it takes some "
          "time for a new topic to propagate throughout the cluster to all "
          "brokers. "
          "If a client requests topic metadata after manual topic creation but "
          "before the topic has been fully propagated to the broker the "
          "client is requesting metadata from, the topic will seem to be "
          "non-existent and the client will mark the topic as such, "
          "failing queued produced messages with `ERR__UNKNOWN_TOPIC`. "
          "This setting delays marking a topic as non-existent until the "
          "configured propagation max time has passed. "
          "The maximum propagation time is calculated from the time the "
          "topic is first referenced in the client, e.g., on produce().",
          0, 60*60*1000, 30*1000 },
        { _RK_GLOBAL, "topic.blacklist", _RK_C_PATLIST,
          _RK(topic_blacklist),
          "Topic blacklist, a comma-separated list of regular expressions "
          "for matching topic names that should be ignored in "
          "broker metadata information as if the topics did not exist." },
	{ _RK_GLOBAL|_RK_MED, "debug", _RK_C_S2F, _RK(debug),
	  "A comma-separated list of debug contexts to enable. "
          "Detailed Producer debugging: broker,topic,msg. "
          "Consumer: consumer,cgrp,topic,fetch",
	  .s2i = {
                        { RD_KAFKA_DBG_GENERIC,  "generic" },
			{ RD_KAFKA_DBG_BROKER,   "broker" },
			{ RD_KAFKA_DBG_TOPIC,    "topic" },
			{ RD_KAFKA_DBG_METADATA, "metadata" },
                        { RD_KAFKA_DBG_FEATURE,  "feature" },
			{ RD_KAFKA_DBG_QUEUE,    "queue" },
			{ RD_KAFKA_DBG_MSG,      "msg" },
			{ RD_KAFKA_DBG_PROTOCOL, "protocol" },
                        { RD_KAFKA_DBG_CGRP,     "cgrp" },
			{ RD_KAFKA_DBG_SECURITY, "security" },
			{ RD_KAFKA_DBG_FETCH,    "fetch" },
                        { RD_KAFKA_DBG_INTERCEPTOR, "interceptor" },
                        { RD_KAFKA_DBG_PLUGIN,   "plugin" },
                        { RD_KAFKA_DBG_CONSUMER, "consumer" },
                        { RD_KAFKA_DBG_ADMIN,    "admin" },
                        { RD_KAFKA_DBG_EOS,      "eos" },
                        { RD_KAFKA_DBG_MOCK,     "mock" },
                        { RD_KAFKA_DBG_ASSIGNOR, "assignor" },
                        { RD_KAFKA_DBG_CONF,     "conf" },
			{ RD_KAFKA_DBG_ALL,      "all" }
		} },
	{ _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
	  "Default timeout for network requests. "
          "Producer: ProduceRequests will use the lesser value of "
          "`socket.timeout.ms` and remaining `message.timeout.ms` for the "
          "first message in the batch. "
          "Consumer: FetchRequests will use "
          "`fetch.wait.max.ms` + `socket.timeout.ms`. "
          "Admin: Admin requests will use `socket.timeout.ms` or explicitly "
          "set `rd_kafka_AdminOptions_set_operation_timeout()` value.",
	  10, 300*1000, 60*1000 },
        { _RK_GLOBAL|_RK_DEPRECATED, "socket.blocking.max.ms", _RK_C_INT,
          _RK(socket_blocking_max_ms),
          "No longer used.",
          1, 60*1000, 1000 },
	{ _RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT,
	  _RK(socket_sndbuf_size),
	  "Broker socket send buffer size. System default is used if 0.",
	  0, 100000000, 0 },
	{ _RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT,
	  _RK(socket_rcvbuf_size),
	  "Broker socket receive buffer size. System default is used if 0.",
	  0, 100000000, 0 },
	{ _RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL,
	  _RK(socket_keepalive),
          "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets",
          0, 1, 0
#ifndef SO_KEEPALIVE
          , .unsupported = "SO_KEEPALIVE not available at build time"
#endif
        },
	{ _RK_GLOBAL, "socket.nagle.disable", _RK_C_BOOL,
	  _RK(socket_nagle_disable),
          "Disable the Nagle algorithm (TCP_NODELAY) on broker sockets.",
          0, 1, 0
#ifndef TCP_NODELAY
          , .unsupported = "TCP_NODELAY not available at build time"
#endif
        },
        { _RK_GLOBAL, "socket.max.fails", _RK_C_INT,
          _RK(socket_max_fails),
          "Disconnect from broker when this number of send failures "
          "(e.g., timed out requests) is reached. Disable with 0. "
          "WARNING: It is highly recommended to leave this setting at "
          "its default value of 1 to avoid the client and broker to "
          "become desynchronized in case of request timeouts. "
          "NOTE: The connection is automatically re-established.",
          0, 1000000, 1 },
	{ _RK_GLOBAL, "broker.address.ttl", _RK_C_INT,
	  _RK(broker_addr_ttl),
	  "How long to cache the broker address resolving "
          "results (milliseconds).",
	  0, 86400*1000, 1*1000 },
        { _RK_GLOBAL, "broker.address.family", _RK_C_S2I,
          _RK(broker_addr_family),
          "Allowed broker IP address families: any, v4, v6",
          .vdef = AF_UNSPEC,
          .s2i = {
                        { AF_UNSPEC, "any" },
                        { AF_INET, "v4" },
                        { AF_INET6, "v6" },
                } },
        { _RK_GLOBAL|_RK_MED|_RK_HIDDEN, "enable.sparse.connections",
          _RK_C_BOOL,
          _RK(sparse_connections),
          "When enabled the client will only connect to brokers "
          "it needs to communicate with. When disabled the client "
          "will maintain connections to all brokers in the cluster.",
          0, 1, 1 },
        { _RK_GLOBAL|_RK_DEPRECATED, "reconnect.backoff.jitter.ms", _RK_C_INT,
          _RK(reconnect_jitter_ms),
          "No longer used. See `reconnect.backoff.ms` and "
          "`reconnect.backoff.max.ms`.",
          0, 60*60*1000, 0 },
        { _RK_GLOBAL|_RK_MED, "reconnect.backoff.ms", _RK_C_INT,
          _RK(reconnect_backoff_ms),
          "The initial time to wait before reconnecting to a broker "
          "after the connection has been closed. "
          "The time is increased exponentially until "
          "`reconnect.backoff.max.ms` is reached. "
          "-25% to +50% jitter is applied to each reconnect backoff. "
          "A value of 0 disables the backoff and reconnects immediately.",
          0, 60*60*1000, 100 },
        { _RK_GLOBAL|_RK_MED, "reconnect.backoff.max.ms", _RK_C_INT,
          _RK(reconnect_backoff_max_ms),
          "The maximum time to wait before reconnecting to a broker "
          "after the connection has been closed.",
          0, 60*60*1000, 10*1000 },
	{ _RK_GLOBAL|_RK_HIGH, "statistics.interval.ms", _RK_C_INT,
	  _RK(stats_interval_ms),
	  "librdkafka statistics emit interval. The application also needs to "
	  "register a stats callback using `rd_kafka_conf_set_stats_cb()`. "
	  "The granularity is 1000ms. A value of 0 disables statistics.",
	  0, 86400*1000, 0 },
	{ _RK_GLOBAL, "enabled_events", _RK_C_INT,
	  _RK(enabled_events),
	  "See `rd_kafka_conf_set_events()`",
	  0, 0x7fffffff, 0 },
	{ _RK_GLOBAL, "error_cb", _RK_C_PTR,
	  _RK(error_cb),
	  "Error callback (set with rd_kafka_conf_set_error_cb())" },
	{ _RK_GLOBAL, "throttle_cb", _RK_C_PTR,
	  _RK(throttle_cb),
	  "Throttle callback (set with rd_kafka_conf_set_throttle_cb())" },
	{ _RK_GLOBAL, "stats_cb", _RK_C_PTR,
	  _RK(stats_cb),
	  "Statistics callback (set with rd_kafka_conf_set_stats_cb())" },
	{ _RK_GLOBAL, "log_cb", _RK_C_PTR,
	  _RK(log_cb),
	  "Log callback (set with rd_kafka_conf_set_log_cb())",
          .pdef = rd_kafka_log_print },
        { _RK_GLOBAL, "log_level", _RK_C_INT,
          _RK(log_level),
          "Logging level (syslog(3) levels)",
          0, 7, 6 },
        { _RK_GLOBAL, "log.queue", _RK_C_BOOL, _RK(log_queue),
          "Disable spontaneous log_cb from internal librdkafka "
          "threads, instead enqueue log messages on queue set with "
          "`rd_kafka_set_log_queue()` and serve log callbacks or "
          "events through the standard poll APIs. "
          "**NOTE**: Log messages will linger in a temporary queue "
          "until the log queue has been set.",
          0, 1, 0 },
	{ _RK_GLOBAL, "log.thread.name", _RK_C_BOOL,
	  _RK(log_thread_name),
	  "Print internal thread name in log messages "
	  "(useful for debugging librdkafka internals)",
	  0, 1, 1 },
        { _RK_GLOBAL, "enable.random.seed", _RK_C_BOOL,
          _RK(enable_random_seed),
          "If enabled librdkafka will initialize the PRNG "
          "with srand(current_time.milliseconds) on the first invocation of "
          "rd_kafka_new() (required only if rand_r() is not available on your platform). "
          "If disabled the application must call srand() prior to calling rd_kafka_new().",
          0, 1, 1 },
	{ _RK_GLOBAL, "log.connection.close", _RK_C_BOOL,
	  _RK(log_connection_close),
	  "Log broker disconnects. "
          "It might be useful to turn this off when interacting with "
          "0.9 brokers with an aggressive `connection.max.idle.ms` value.",
	  0, 1, 1 },
        { _RK_GLOBAL, "background_event_cb", _RK_C_PTR,
          _RK(background_event_cb),
          "Background queue event callback "
          "(set with rd_kafka_conf_set_background_event_cb())" },
        { _RK_GLOBAL, "socket_cb", _RK_C_PTR,
          _RK(socket_cb),
          "Socket creation callback to provide race-free CLOEXEC",
          .pdef =
#ifdef __linux__
          rd_kafka_socket_cb_linux
#else
          rd_kafka_socket_cb_generic
#endif
        },
        { _RK_GLOBAL, "connect_cb", _RK_C_PTR,
          _RK(connect_cb),
          "Socket connect callback",
        },
        { _RK_GLOBAL, "closesocket_cb", _RK_C_PTR,
          _RK(closesocket_cb),
          "Socket close callback",
        },
        { _RK_GLOBAL, "open_cb", _RK_C_PTR,
          _RK(open_cb),
          "File open callback to provide race-free CLOEXEC",
          .pdef =
#ifdef __linux__
          rd_kafka_open_cb_linux
#else
          rd_kafka_open_cb_generic
#endif
        },
	{ _RK_GLOBAL, "opaque", _RK_C_PTR,
	  _RK(opaque),
	  "Application opaque (set with rd_kafka_conf_set_opaque())" },
        { _RK_GLOBAL, "default_topic_conf", _RK_C_PTR,
          _RK(topic_conf),
          "Default topic configuration for automatically subscribed topics" },
	{ _RK_GLOBAL, "internal.termination.signal", _RK_C_INT,
	  _RK(term_sig),
	  "Signal that librdkafka will use to quickly terminate on "
	  "rd_kafka_destroy(). If this signal is not set then there will be a "
	  "delay before rd_kafka_wait_destroyed() returns true "
	  "as internal threads are timing out their system calls. "
	  "If this signal is set however the delay will be minimal. "
	  "The application should mask this signal as an internal "
	  "signal handler is installed.",
	  0, 128, 0 },
	{ _RK_GLOBAL|_RK_HIGH, "api.version.request", _RK_C_BOOL,
	  _RK(api_version_request),
	  "Request broker's supported API versions to adjust functionality to "
	  "available protocol features. If set to false, or the "
          "ApiVersionRequest fails, the fallback version "
	  "`broker.version.fallback` will be used. "
	  "**NOTE**: Depends on broker version >=0.10.0. If the request is not "
	  "supported by (an older) broker the `broker.version.fallback` fallback is used.",
	  0, 1, 1 },
	{ _RK_GLOBAL, "api.version.request.timeout.ms", _RK_C_INT,
	  _RK(api_version_request_timeout_ms),
	  "Timeout for broker API version requests.",
	  1, 5*60*1000, 10*1000 },
	{ _RK_GLOBAL|_RK_MED, "api.version.fallback.ms", _RK_C_INT,
	  _RK(api_version_fallback_ms),
	  "Dictates how long the `broker.version.fallback` fallback is used "
	  "in the case the ApiVersionRequest fails. "
	  "**NOTE**: The ApiVersionRequest is only issued when a new connection "
	  "to the broker is made (such as after an upgrade).",
	  0, 86400*7*1000, 0 },

	{ _RK_GLOBAL|_RK_MED, "broker.version.fallback", _RK_C_STR,
	  _RK(broker_version_fallback),
	  "Older broker versions (before 0.10.0) provide no way for a client to query "
	  "for supported protocol features "
	  "(ApiVersionRequest, see `api.version.request`) making it impossible "
	  "for the client to know what features it may use. "
	  "As a workaround a user may set this property to the expected broker "
	  "version and the client will automatically adjust its feature set "
	  "accordingly if the ApiVersionRequest fails (or is disabled). "
	  "The fallback broker version will be used for `api.version.fallback.ms`. "
          "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. "
          "Any other value >= 0.10, such as 0.10.2.1, "
          "enables ApiVersionRequests.",
          .sdef = "0.10.0",
	  .validate = rd_kafka_conf_validate_broker_version },

	/* Security related global properties */
	{ _RK_GLOBAL|_RK_HIGH, "security.protocol", _RK_C_S2I,
	  _RK(security_protocol),
	  "Protocol used to communicate with brokers.",
	  .vdef = RD_KAFKA_PROTO_PLAINTEXT,
	  .s2i = {
			{ RD_KAFKA_PROTO_PLAINTEXT, "plaintext" },
			{ RD_KAFKA_PROTO_SSL, "ssl", _UNSUPPORTED_SSL },
			{ RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext" },
			{ RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl",
                          _UNSUPPORTED_SSL },
			{ 0, NULL }
		} },

	{ _RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR,
	  _RK(ssl.cipher_suites),
	  "A cipher suite is a named combination of authentication, "
	  "encryption, MAC and key exchange algorithm used to negotiate the "
	  "security settings for a network connection using TLS or SSL network "
	  "protocol. See manual page for `ciphers(1)` and "
	  "`SSL_CTX_set_cipher_list(3).",
          _UNSUPPORTED_SSL
	},
        { _RK_GLOBAL, "ssl.curves.list", _RK_C_STR,
          _RK(ssl.curves_list),
          "The supported-curves extension in the TLS ClientHello message specifies "
          "the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client "
          "is willing to have the server use. See manual page for "
          "`SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required.",
          _UNSUPPORTED_OPENSSL_1_0_2
        },
        { _RK_GLOBAL, "ssl.sigalgs.list", _RK_C_STR,
          _RK(ssl.sigalgs_list),
          "The client uses the TLS ClientHello signature_algorithms extension "
          "to indicate to the server which signature/hash algorithm pairs "
          "may be used in digital signatures. See manual page for "
          "`SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required.",
          _UNSUPPORTED_OPENSSL_1_0_2
        },
        { _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.location", _RK_C_STR,
          _RK(ssl.key_location),
          "Path to client's private key (PEM) used for authentication.",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.password", _RK_C_STR,
          _RK(ssl.key_password),
          "Private key passphrase (for use with `ssl.key.location` "
          "and `set_ssl_cert()`)",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.pem", _RK_C_STR,
          _RK(ssl.key_pem),
          "Client's private key string (PEM format) used for authentication.",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL|_RK_SENSITIVE, "ssl_key", _RK_C_INTERNAL,
          _RK(ssl.key),
          "Client's private key as set by rd_kafka_conf_set_ssl_cert()",
          .dtor = rd_kafka_conf_cert_dtor,
          .copy = rd_kafka_conf_cert_copy,
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "ssl.certificate.location", _RK_C_STR,
          _RK(ssl.cert_location),
          "Path to client's public key (PEM) used for authentication.",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "ssl.certificate.pem", _RK_C_STR,
          _RK(ssl.cert_pem),
          "Client's public key string (PEM format) used for authentication.",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "ssl_certificate", _RK_C_INTERNAL,
          _RK(ssl.key),
          "Client's public key as set by rd_kafka_conf_set_ssl_cert()",
          .dtor = rd_kafka_conf_cert_dtor,
          .copy = rd_kafka_conf_cert_copy,
          _UNSUPPORTED_SSL
        },

        { _RK_GLOBAL, "ssl.ca.location", _RK_C_STR,
          _RK(ssl.ca_location),
          "File or directory path to CA certificate(s) for verifying "
          "the broker's key. "
          "Defaults: "
          "On Windows the system's CA certificates are automatically looked "
          "up in the Windows Root certificate store. "
          "On Mac OSX it is recommended to install openssl using Homebrew, "
          "to provide CA certificates. "
          "On Linux install the distribution's ca-certificates package. "
          "If OpenSSL is statically linked or `ssl.ca.location` is set to "
          "`probe` a list of standard paths will be probed and the first one "
          "found will be used as the default CA certificate location path. "
          "If OpenSSL is dynamically linked the OpenSSL library's default "
          "path will be used (see `OPENSSLDIR` in `openssl version -a`).",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "ssl_ca", _RK_C_INTERNAL,
          _RK(ssl.ca),
          "CA certificate as set by rd_kafka_conf_set_ssl_cert()",
          .dtor = rd_kafka_conf_cert_dtor,
          .copy = rd_kafka_conf_cert_copy,
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "ssl.ca.certificate.stores", _RK_C_STR,
          _RK(ssl.ca_cert_stores),
          "Comma-separated list of Windows Certificate stores to load "
          "CA certificates from. Certificates will be loaded in the same "
          "order as stores are specified. If no certificates can be loaded "
          "from any of the specified stores an error is logged and the "
          "OpenSSL library's default CA location is used instead. "
          "Store names are typically one or more of: MY, Root, Trust, CA.",
          .sdef = "Root",
#if !defined(_WIN32)
          .unsupported = "configuration only valid on Windows"
#endif
        },

        { _RK_GLOBAL, "ssl.crl.location", _RK_C_STR,
          _RK(ssl.crl_location),
          "Path to CRL for verifying broker's certificate validity.",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "ssl.keystore.location", _RK_C_STR,
          _RK(ssl.keystore_location),
          "Path to client's keystore (PKCS#12) used for authentication.",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL|_RK_SENSITIVE, "ssl.keystore.password", _RK_C_STR,
          _RK(ssl.keystore_password),
          "Client's keystore (PKCS#12) password.",
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "enable.ssl.certificate.verification", _RK_C_BOOL,
          _RK(ssl.enable_verify),
          "Enable OpenSSL's builtin broker (server) certificate verification. "
          "This verification can be extended by the application by "
          "implementing a certificate_verify_cb.",
          0, 1, 1,
          _UNSUPPORTED_SSL
        },
        { _RK_GLOBAL, "ssl.endpoint.identification.algorithm", _RK_C_S2I,
          _RK(ssl.endpoint_identification),
          "Endpoint identification algorithm to validate broker "
          "hostname using broker certificate. "
          "https - Server (broker) hostname verification as "
          "specified in RFC2818. "
          "none - No endpoint verification. "
          "OpenSSL >= 1.0.2 required.",
          .vdef = RD_KAFKA_SSL_ENDPOINT_ID_NONE,
          .s2i = {
                        { RD_KAFKA_SSL_ENDPOINT_ID_NONE, "none" },
                        { RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https" }
                },
          _UNSUPPORTED_OPENSSL_1_0_2
        },
        { _RK_GLOBAL, "ssl.certificate.verify_cb", _RK_C_PTR,
          _RK(ssl.cert_verify_cb),
          "Callback to verify the broker certificate chain.",
          _UNSUPPORTED_SSL
        },

        /* Point user in the right direction if they try to apply
         * Java client SSL / JAAS properties. */
        { _RK_GLOBAL, "ssl.truststore.location", _RK_C_INVALID,
          _RK(dummy),
          "Java TrustStores are not supported, use `ssl.ca.location` "
          "and a certificate file instead. "
          "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka "
          "for more information."
        },
        { _RK_GLOBAL, "sasl.jaas.config", _RK_C_INVALID,
          _RK(dummy),
          "Java JAAS configuration is not supported, see "
          "https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka "
          "for more information."
        },

	{_RK_GLOBAL|_RK_HIGH, "sasl.mechanisms", _RK_C_STR,
	 _RK(sasl.mechanisms),
	 "SASL mechanism to use for authentication. "
	 "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. "
	 "**NOTE**: Despite the name only one mechanism must be configured.",
	 .sdef = "GSSAPI",
	 .validate = rd_kafka_conf_validate_single },
        {_RK_GLOBAL|_RK_HIGH, "sasl.mechanism", _RK_C_ALIAS,
         .sdef = "sasl.mechanisms" },
	{ _RK_GLOBAL, "sasl.kerberos.service.name", _RK_C_STR,
	  _RK(sasl.service_name),
	  "Kerberos principal name that Kafka runs as, "
          "not including /hostname@REALM",
	  .sdef = "kafka" },
	{ _RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR,
	  _RK(sasl.principal),
          "This client's Kerberos principal name. "
          "(Not supported on Windows, will use the logon user's principal).",
	  .sdef = "kafkaclient" },
        { _RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR,
          _RK(sasl.kinit_cmd),
          "Shell command to refresh or acquire the client's Kerberos ticket. "
          "This command is executed on client creation and every "
          "sasl.kerberos.min.time.before.relogin (0=disable). "
          "%{config.prop.name} is replaced by corresponding config "
          "object value.",
          .sdef =
          /* First attempt to refresh, else acquire. */
          "kinit -R -t \"%{sasl.kerberos.keytab}\" "
          "-k %{sasl.kerberos.principal} || "
          "kinit -t \"%{sasl.kerberos.keytab}\" -k %{sasl.kerberos.principal}",
          _UNSUPPORTED_WIN32_GSSAPI
        },
        { _RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR,
          _RK(sasl.keytab),
          "Path to Kerberos keytab file. "
          "This configuration property is only used as a variable in "
          "`sasl.kerberos.kinit.cmd` as "
          "` ... -t \"%{sasl.kerberos.keytab}\"`.",
          _UNSUPPORTED_WIN32_GSSAPI
        },
        { _RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT,
          _RK(sasl.relogin_min_time),
          "Minimum time in milliseconds between key refresh attempts. "
          "Disable automatic key refresh by setting this property to 0.",
          0, 86400*1000, 60*1000,
          _UNSUPPORTED_WIN32_GSSAPI
        },
        { _RK_GLOBAL|_RK_HIGH|_RK_SENSITIVE, "sasl.username", _RK_C_STR,
          _RK(sasl.username),
          "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" },
        { _RK_GLOBAL|_RK_HIGH|_RK_SENSITIVE, "sasl.password", _RK_C_STR,
          _RK(sasl.password),
          "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" },
        { _RK_GLOBAL|_RK_SENSITIVE, "sasl.oauthbearer.config", _RK_C_STR,
          _RK(sasl.oauthbearer_config),
          "SASL/OAUTHBEARER configuration. The format is "
          "implementation-dependent and must be parsed accordingly. The "
          "default unsecured token implementation (see "
          "https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes "
          "space-separated name=value pairs with valid names including "
          "principalClaimName, principal, scopeClaimName, scope, and "
          "lifeSeconds. The default value for principalClaimName is \"sub\", "
          "the default value for scopeClaimName is \"scope\", and the default "
          "value for lifeSeconds is 3600. The scope value is CSV format with "
          "the default value being no/empty scope. For example: "
          "`principalClaimName=azp principal=admin scopeClaimName=roles "
          "scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions "
          "can be communicated to the broker via "
          "`extension_NAME=value`. For example: "
          "`principal=admin extension_traceId=123`",
          _UNSUPPORTED_OAUTHBEARER
        },
        { _RK_GLOBAL, "enable.sasl.oauthbearer.unsecure.jwt", _RK_C_BOOL,
          _RK(sasl.enable_oauthbearer_unsecure_jwt),
          "Enable the builtin unsecure JWT OAUTHBEARER token handler "
          "if no oauthbearer_refresh_cb has been set. "
          "This builtin handler should only be used for development "
          "or testing, and not in production.",
          0, 1, 0,
          _UNSUPPORTED_OAUTHBEARER
        },
        { _RK_GLOBAL, "oauthbearer_token_refresh_cb", _RK_C_PTR,
          _RK(sasl.oauthbearer_token_refresh_cb),
          "SASL/OAUTHBEARER token refresh callback (set with "
          "rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by "
          "rd_kafka_poll(), et.al. "
          "This callback will be triggered when it is time to refresh "
          "the client's OAUTHBEARER token.",
          _UNSUPPORTED_OAUTHBEARER
        },

        /* Plugins */
        { _RK_GLOBAL, "plugin.library.paths", _RK_C_STR,
          _RK(plugin_paths),
          "List of plugin libraries to load (; separated). "
          "The library search path is platform dependent (see dlopen(3) for "
          "Unix and LoadLibrary() for Windows). If no filename extension is "
          "specified the platform-specific extension (such as .dll or .so) "
          "will be appended automatically.",
#if WITH_PLUGINS
          .set = rd_kafka_plugins_conf_set
#else
          .unsupported = "libdl/dlopen(3) not available at build time"
#endif
        },

        /* Interceptors are added through specific API and not exposed
         * as configuration properties.
         * The interceptor property must be defined after plugin.library.paths
         * so that the plugin libraries are properly loaded before
         * interceptors are configured when duplicating configuration objects.*/
        { _RK_GLOBAL, "interceptors", _RK_C_INTERNAL,
          _RK(interceptors),
          "Interceptors added through rd_kafka_conf_interceptor_add_..() "
          "and any configuration handled by interceptors.",
          .ctor = rd_kafka_conf_interceptor_ctor,
          .dtor = rd_kafka_conf_interceptor_dtor,
          .copy = rd_kafka_conf_interceptor_copy },

        /* Test mocks. */
        { _RK_GLOBAL|_RK_HIDDEN, "test.mock.num.brokers", _RK_C_INT,
          _RK(mock.broker_cnt),
          "Number of mock brokers to create. "
          "This will automatically overwrite `bootstrap.servers` with the "
          "mock broker list.",
          0, 10000, 0 },

        /* Unit test interfaces.
         * These are not part of the public API and may change at any time.
         * Only to be used by the librdkafka tests. */
        { _RK_GLOBAL|_RK_HIDDEN, "ut_handle_ProduceResponse", _RK_C_PTR,
          _RK(ut.handle_ProduceResponse),
          "ProduceResponse handler: "
          "rd_kafka_resp_err_t (*cb) (rd_kafka_t *rk, "
          "int32_t brokerid, uint64_t msgid, rd_kafka_resp_err_t err)" },

        /* Global consumer group properties */
        { _RK_GLOBAL|_RK_CGRP|_RK_HIGH, "group.id", _RK_C_STR,
          _RK(group_id_str),
          "Client group id string. All clients sharing the same group.id "
          "belong to the same group." },
        { _RK_GLOBAL|_RK_CGRP|_RK_MED,
          "group.instance.id", _RK_C_STR,
          _RK(group_instance_id),
          "Enable static group membership. "
          "Static group members are able to leave and rejoin a group "
          "within the configured `session.timeout.ms` without prompting a "
          "group rebalance. This should be used in combination with a larger "
          "`session.timeout.ms` to avoid group rebalances caused by transient "
          "unavailability (e.g. process restarts). "
          "Requires broker version >= 2.3.0."},
        { _RK_GLOBAL|_RK_CGRP|_RK_MED, "partition.assignment.strategy",
          _RK_C_STR,
          _RK(partition_assignment_strategy),
          "The name of one or more partition assignment strategies. The "
          "elected group leader will use a strategy supported by all "
          "members of the group to assign partitions to group members. If "
          "there is more than one eligible strategy, preference is "
          "determined by the order of this list (strategies earlier in the "
          "list have higher priority). "
          "Cooperative and non-cooperative (eager) strategies must not be "
          "mixed. "
          "Available strategies: range, roundrobin, cooperative-sticky.",
          .sdef = "range,roundrobin" },
        { _RK_GLOBAL|_RK_CGRP|_RK_HIGH, "session.timeout.ms", _RK_C_INT,
          _RK(group_session_timeout_ms),
          "Client group session and failure detection timeout. "
          "The consumer sends periodic heartbeats (heartbeat.interval.ms) "
          "to indicate its liveness to the broker. If no hearts are "
          "received by the broker for a group member within the "
          "session timeout, the broker will remove the consumer from "
          "the group and trigger a rebalance. "
          "The allowed range is configured with the **broker** configuration "
          "properties `group.min.session.timeout.ms` and "
          "`group.max.session.timeout.ms`. "
          "Also see `max.poll.interval.ms`.",
          1, 3600*1000, 10*1000 },
        { _RK_GLOBAL|_RK_CGRP, "heartbeat.interval.ms", _RK_C_INT,
          _RK(group_heartbeat_intvl_ms),
          "Group session keepalive heartbeat interval.",
          1, 3600*1000, 3*1000 },
        { _RK_GLOBAL|_RK_CGRP, "group.protocol.type", _RK_C_KSTR,
          _RK(group_protocol_type),
          "Group protocol type. NOTE: Currently, the only supported group "
          "protocol type is `consumer`.",
          .sdef = "consumer" },
        { _RK_GLOBAL|_RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT,
          _RK(coord_query_intvl_ms),
          "How often to query for the current client group coordinator. "
          "If the currently assigned coordinator is down the configured "
          "query interval will be divided by ten to more quickly recover "
          "in case of coordinator reassignment.",
          1, 3600*1000, 10*60*1000 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "max.poll.interval.ms", _RK_C_INT,
          _RK(max_poll_interval_ms),
          "Maximum allowed time between calls to consume messages "
          "(e.g., rd_kafka_consumer_poll()) for high-level consumers. "
          "If this interval is exceeded the consumer is considered failed "
          "and the group will rebalance in order to reassign the "
          "partitions to another consumer group member. "
          "Warning: Offset commits may be not possible at this point. "
          "Note: It is recommended to set `enable.auto.offset.store=false` "
          "for long-time processing applications and then explicitly store "
          "offsets (using offsets_store()) *after* message processing, to "
          "make sure offsets are not auto-committed prior to processing "
          "has finished. "
          "The interval is checked two times per second. "
          "See KIP-62 for more information.",
          1, 86400*1000, 300000
        },

        /* Global consumer properties */
        { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "enable.auto.commit", _RK_C_BOOL,
          _RK(enable_auto_commit),
          "Automatically and periodically commit offsets in the background. "
          "Note: setting this to false does not prevent the consumer from "
          "fetching previously committed start offsets. To circumvent this "
          "behaviour set specific start offsets per partition in the call "
          "to assign().",
          0, 1, 1 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "auto.commit.interval.ms",
          _RK_C_INT,
	  _RK(auto_commit_interval_ms),
	  "The frequency in milliseconds that the consumer offsets "
	  "are committed (written) to offset storage. (0 = disable). "
          "This setting is used by the high-level consumer.",
          0, 86400*1000, 5*1000 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "enable.auto.offset.store",
          _RK_C_BOOL,
          _RK(enable_auto_offset_store),
          "Automatically store offset of last message provided to "
          "application. "
          "The offset store is an in-memory store of the next offset to "
          "(auto-)commit for each partition.",
          0, 1, 1 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "queued.min.messages", _RK_C_INT,
	  _RK(queued_min_msgs),
	  "Minimum number of messages per topic+partition "
          "librdkafka tries to maintain in the local consumer queue.",
	  1, 10000000, 100000 },
	{ _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "queued.max.messages.kbytes",
          _RK_C_INT,
	  _RK(queued_max_msg_kbytes),
          "Maximum number of kilobytes of queued pre-fetched messages "
          "in the local consumer queue. "
          "If using the high-level consumer this setting applies to the "
          "single consumer queue, regardless of the number of partitions. "
          "When using the legacy simple consumer or when separate "
          "partition queues are used this setting applies per partition. "
	  "This value may be overshot by fetch.message.max.bytes. "
	  "This property has higher priority than queued.min.messages.",
          1, INT_MAX/1024, 0x10000/*64MB*/ },
        { _RK_GLOBAL|_RK_CONSUMER, "fetch.wait.max.ms", _RK_C_INT,
	  _RK(fetch_wait_max_ms),
	  "Maximum time the broker may wait to fill the Fetch response "
	  "with fetch.min.bytes of messages.",
	  0, 300*1000, 500 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.message.max.bytes",
          _RK_C_INT,
          _RK(fetch_msg_max_bytes),
          "Initial maximum number of bytes per topic+partition to request when "
          "fetching messages from the broker. "
	  "If the client encounters a message larger than this value "
	  "it will gradually try to increase it until the "
	  "entire message can be fetched.",
          1, 1000000000, 1024*1024 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "max.partition.fetch.bytes",
          _RK_C_ALIAS,
	  .sdef = "fetch.message.max.bytes" },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.max.bytes", _RK_C_INT,
          _RK(fetch_max_bytes),
          "Maximum amount of data the broker shall return for a Fetch request. "
          "Messages are fetched in batches by the consumer and if the first "
          "message batch in the first non-empty partition of the Fetch request "
          "is larger than this value, then the message batch will still be "
          "returned to ensure the consumer can make progress. "
          "The maximum message batch size accepted by the broker is defined "
          "via `message.max.bytes` (broker config) or "
          "`max.message.bytes` (broker topic config). "
          "`fetch.max.bytes` is automatically adjusted upwards to be "
          "at least `message.max.bytes` (consumer config).",
          0, INT_MAX-512, 50*1024*1024 /* 50MB */ },
	{ _RK_GLOBAL|_RK_CONSUMER, "fetch.min.bytes", _RK_C_INT,
	  _RK(fetch_min_bytes),
	  "Minimum number of bytes the broker responds with. "
	  "If fetch.wait.max.ms expires the accumulated data will "
	  "be sent to the client regardless of this setting.",
	  1, 100000000, 1 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.error.backoff.ms", _RK_C_INT,
	  _RK(fetch_error_backoff_ms),
	  "How long to postpone the next fetch request for a "
	  "topic+partition in case of a fetch error.",
	  0, 300*1000, 500 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.method",
          _RK_C_S2I,
          _RK(offset_store_method),
          "Offset commit store method: "
          "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
          "'broker' - broker commit store "
          "(requires Apache Kafka 0.8.2 or later on the broker).",
          .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
          .s2i = {
                        { RD_KAFKA_OFFSET_METHOD_NONE, "none" },
                        { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
                        { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
                }
        },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "isolation.level",
          _RK_C_S2I,
          _RK(isolation_level),
          "Controls how to read messages written transactionally: "
          "`read_committed` - only return transactional messages which have "
          "been committed. `read_uncommitted` - return all messages, even "
          "transactional messages which have been aborted.",
          .vdef = RD_KAFKA_READ_COMMITTED,
          .s2i = {
                        { RD_KAFKA_READ_UNCOMMITTED, "read_uncommitted" },
                        { RD_KAFKA_READ_COMMITTED, "read_committed" }
                }
        },
        { _RK_GLOBAL|_RK_CONSUMER, "consume_cb", _RK_C_PTR,
	  _RK(consume_cb),
	  "Message consume callback (set with rd_kafka_conf_set_consume_cb())"},
	{ _RK_GLOBAL|_RK_CONSUMER, "rebalance_cb", _RK_C_PTR,
	  _RK(rebalance_cb),
	  "Called after consumer group has been rebalanced "
          "(set with rd_kafka_conf_set_rebalance_cb())" },
	{ _RK_GLOBAL|_RK_CONSUMER, "offset_commit_cb", _RK_C_PTR,
	  _RK(offset_commit_cb),
	  "Offset commit result propagation callback. "
          "(set with rd_kafka_conf_set_offset_commit_cb())" },
	{ _RK_GLOBAL|_RK_CONSUMER, "enable.partition.eof", _RK_C_BOOL,
	  _RK(enable_partition_eof),
	  "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the "
	  "consumer reaches the end of a partition.",
	  0, 1, 0 },
        { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "check.crcs", _RK_C_BOOL,
          _RK(check_crcs),
          "Verify CRC32 of consumed messages, ensuring no on-the-wire or "
          "on-disk corruption to the messages occurred. This check comes "
          "at slightly increased CPU usage.",
          0, 1, 0 },
        { _RK_GLOBAL|_RK_CONSUMER, "allow.auto.create.topics", _RK_C_BOOL,
          _RK(allow_auto_create_topics),
          "Allow automatic topic creation on the broker when subscribing to "
          "or assigning non-existent topics. "
          "The broker must also be configured with "
          "`auto.create.topics.enable=true` for this configuraiton to "
          "take effect. "
          "Note: The default value (false) is different from the "
          "Java consumer (true). "
          "Requires broker version >= 0.11.0.0, for older broker versions "
          "only the broker configuration applies.",
          0, 1, 0 },
        { _RK_GLOBAL, "client.rack", _RK_C_KSTR,
          _RK(client_rack),
          "A rack identifier for this client. This can be any string value "
          "which indicates where this client is physically located. It "
          "corresponds with the broker config `broker.rack`.",
          .sdef =  "" },

        /* Global producer properties */
        { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "transactional.id", _RK_C_STR,
          _RK(eos.transactional_id),
          "Enables the transactional producer. "
          "The transactional.id is used to identify the same transactional "
          "producer instance across process restarts. "
          "It allows the producer to guarantee that transactions corresponding "
          "to earlier instances of the same producer have been finalized "
          "prior to starting any new transactions, and that any "
          "zombie instances are fenced off. "
          "If no transactional.id is provided, then the producer is limited "
          "to idempotent delivery (if enable.idempotence is set). "
          "Requires broker version >= 0.11.0." },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "transaction.timeout.ms", _RK_C_INT,
          _RK(eos.transaction_timeout_ms),
          "The maximum amount of time in milliseconds that the transaction "
          "coordinator will wait for a transaction status update from the "
          "producer before proactively aborting the ongoing transaction. "
          "If this value is larger than the `transaction.max.timeout.ms` "
          "setting in the broker, the init_transactions() call will fail with "
          "ERR_INVALID_TRANSACTION_TIMEOUT. "
          "The transaction timeout automatically adjusts "
          "`message.timeout.ms` and `socket.timeout.ms`, unless explicitly "
          "configured in which case they must not exceed the "
          "transaction timeout (`socket.timeout.ms` must be at least 100ms "
          "lower than `transaction.timeout.ms`). "
          "This is also the default timeout value if no timeout (-1) is "
          "supplied to the transactional API methods.",
          1000, INT_MAX, 60000 },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "enable.idempotence", _RK_C_BOOL,
          _RK(eos.idempotence),
          "When set to `true`, the producer will ensure that messages are "
          "successfully produced exactly once and in the original produce "
          "order. "
          "The following configuration properties are adjusted automatically "
          "(if not modified by the user) when idempotence is enabled: "
          "`max.in.flight.requests.per.connection="
          RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "` (must be less than or "
          "equal to " RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "), `retries=INT32_MAX` "
          "(must be greater than 0), `acks=all`, `queuing.strategy=fifo`. "
          "Producer instantation will fail if user-supplied configuration "
          "is incompatible.",
          0, 1, 0 },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_EXPERIMENTAL, "enable.gapless.guarantee",
          _RK_C_BOOL,
          _RK(eos.gapless),
          "When set to `true`, any error that could result in a gap "
          "in the produced message series when a batch of messages fails, "
          "will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop "
          "the producer. "
          "Messages failing due to `message.timeout.ms` are not covered "
          "by this guarantee. "
          "Requires `enable.idempotence=true`.",
          0, 1, 0 },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.messages",
          _RK_C_INT,
	  _RK(queue_buffering_max_msgs),
	  "Maximum number of messages allowed on the producer queue. "
	  "This queue is shared by all topics and partitions.",
	  1, 10000000, 100000 },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.kbytes",
          _RK_C_INT,
	  _RK(queue_buffering_max_kbytes),
	  "Maximum total message size sum allowed on the producer queue. "
	  "This queue is shared by all topics and partitions. "
	  "This property has higher priority than queue.buffering.max.messages.",
	  1, INT_MAX, 0x100000/*1GB*/ },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.ms",
          _RK_C_DBL,
	  _RK(buffering_max_ms_dbl),
	  "Delay in milliseconds to wait for messages in the producer queue "
          "to accumulate before constructing message batches (MessageSets) to "
          "transmit to brokers. "
	  "A higher value allows larger and more effective "
          "(less overhead, improved compression) batches of messages to "
          "accumulate at the expense of increased message delivery latency.",
	  .dmin = 0, .dmax = 900.0*1000.0, .ddef = 5.0 },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "linger.ms", _RK_C_ALIAS,
          .sdef = "queue.buffering.max.ms" },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "message.send.max.retries",
          _RK_C_INT,
	  _RK(max_retries),
	  "How many times to retry sending a failing Message. "
          "**Note:** retrying may cause reordering unless "
          "`enable.idempotence` is set to true.",
          0, INT32_MAX, INT32_MAX },
          { _RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
                .sdef = "message.send.max.retries" },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "retry.backoff.ms", _RK_C_INT,
	  _RK(retry_backoff_ms),
	  "The backoff time in milliseconds before retrying a protocol request.",
	  1, 300*1000, 100 },

	{ _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.backpressure.threshold",
          _RK_C_INT, _RK(queue_backpressure_thres),
          "The threshold of outstanding not yet transmitted broker requests "
          "needed to backpressure the producer's message accumulator. "
          "If the number of not yet transmitted requests equals or exceeds "
          "this number, produce request creation that would have otherwise "
          "been triggered (for example, in accordance with linger.ms) will be "
          "delayed. A lower number yields larger and more effective batches. "
          "A higher value can improve latency when using compression on slow "
          "machines.",
        1, 1000000, 1 },

        { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "compression.codec", _RK_C_S2I,
	  _RK(compression_codec),
	  "compression codec to use for compressing message sets. "
	  "This is the default value for all topics, may be overridden by "
	  "the topic configuration property `compression.codec`. ",
	  .vdef = RD_KAFKA_COMPRESSION_NONE,
	  .s2i = {
			{ RD_KAFKA_COMPRESSION_NONE,   "none" },
			{ RD_KAFKA_COMPRESSION_GZIP,   "gzip",
                          _UNSUPPORTED_ZLIB },
			{ RD_KAFKA_COMPRESSION_SNAPPY, "snappy",
                          _UNSUPPORTED_SNAPPY },
			{ RD_KAFKA_COMPRESSION_LZ4, "lz4" },
			{ RD_KAFKA_COMPRESSION_ZSTD, "zstd",
                          _UNSUPPORTED_ZSTD },
			{ 0 }
		}
        },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "compression.type", _RK_C_ALIAS,
          .sdef = "compression.codec" },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "batch.num.messages", _RK_C_INT,
	  _RK(batch_num_messages),
	  "Maximum number of messages batched in one MessageSet. "
	  "The total MessageSet size is also limited by batch.size and "
          "message.max.bytes.",
	  1, 1000000, 10000 },
        { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "batch.size", _RK_C_INT,
	  _RK(batch_size),
	  "Maximum size (in bytes) of all messages batched in one MessageSet, "
          "including protocol framing overhead. "
          "This limit is applied after the first message has been added "
          "to the batch, regardless of the first message's size, this is to "
          "ensure that messages that exceed batch.size are produced. "
	  "The total MessageSet size is also limited by batch.num.messages and "
          "message.max.bytes.",
	  1, INT_MAX, 1000000 },
	{ _RK_GLOBAL|_RK_PRODUCER, "delivery.report.only.error", _RK_C_BOOL,
	  _RK(dr_err_only),
	  "Only provide delivery reports for failed messages.",
	  0, 1, 0 },
	{ _RK_GLOBAL|_RK_PRODUCER, "dr_cb", _RK_C_PTR,
	  _RK(dr_cb),
	  "Delivery report callback (set with rd_kafka_conf_set_dr_cb())" },
	{ _RK_GLOBAL|_RK_PRODUCER, "dr_msg_cb", _RK_C_PTR,
	  _RK(dr_msg_cb),
	  "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())" },
        { _RK_GLOBAL|_RK_PRODUCER, "sticky.partitioning.linger.ms", _RK_C_INT,
          _RK(sticky_partition_linger_ms),
          "Delay in milliseconds to wait to assign new sticky partitions for "
          "each topic. "
          "By default, set to double the time of linger.ms. To disable sticky "
          "behavior, set to 0. "
          "This behavior affects messages with the key NULL in all cases, and "
          "messages with key lengths of zero when the consistent_random "
          "partitioner is in use. "
          "These messages would otherwise be assigned randomly. "
          "A higher value allows for more effective batching of these "
          "messages.",
          0, 900000, 10 },


        /*
         * Topic properties
         */

        /* Topic producer properties */
        { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "request.required.acks", _RK_C_INT,
          _RKT(required_acks),
          "This field indicates the number of acknowledgements the leader "
          "broker must receive from ISR brokers before responding to the "
          "request: "
          "*0*=Broker does not send any response/ack to client, "
          "*-1* or *all*=Broker will block until message is committed by all "
          "in sync replicas (ISRs). If there are less than "
          "`min.insync.replicas` (broker configuration) in the ISR set the "
          "produce request will fail.",
          -1, 1000, -1,
          .s2i = {
                        { -1, "all" },
                }
        },
        { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "acks", _RK_C_ALIAS,
	  .sdef = "request.required.acks" },

        { _RK_TOPIC|_RK_PRODUCER|_RK_MED, "request.timeout.ms", _RK_C_INT,
	  _RKT(request_timeout_ms),
	  "The ack timeout of the producer request in milliseconds. "
	  "This value is only enforced by the broker and relies "
	  "on `request.required.acks` being != 0.",
	  1, 900*1000, 30*1000 },
        { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "message.timeout.ms", _RK_C_INT,
	  _RKT(message_timeout_ms),
	  "Local message timeout. "
	  "This value is only enforced locally and limits the time a "
	  "produced message waits for successful delivery. "
	  "A time of 0 is infinite. "
	  "This is the maximum time librdkafka may use to deliver a message "
	  "(including retries). Delivery error occurs when either the retry "
	  "count or the message timeout are exceeded. "
          "The message timeout is automatically adjusted to "
          "`transaction.timeout.ms` if `transactional.id` is configured.",
	  0, INT32_MAX, 300*1000 },
        { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "delivery.timeout.ms", _RK_C_ALIAS,
          .sdef = "message.timeout.ms" },
        { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED|_RK_EXPERIMENTAL,
          "queuing.strategy", _RK_C_S2I,
          _RKT(queuing_strategy),
          "Producer queuing strategy. FIFO preserves produce ordering, "
          "while LIFO prioritizes new messages.",
          .vdef = 0,
          .s2i = {
                        { RD_KAFKA_QUEUE_FIFO, "fifo" },
                        { RD_KAFKA_QUEUE_LIFO, "lifo" }
                }
        },
        { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED,
          "produce.offset.report", _RK_C_BOOL,
          _RKT(produce_offset_report),
          "No longer used.",
          0, 1, 0 },
        { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "partitioner", _RK_C_STR,
          _RKT(partitioner_str),
          "Partitioner: "
          "`random` - random distribution, "
          "`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), "
          "`consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), "
          "`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), "
          "`murmur2_random` - Java Producer compatible Murmur2 hash of key "
          "(NULL keys are randomly partitioned. This is functionally equivalent "
          "to the default partitioner in the Java Producer.), "
          "`fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), "
          "`fnv1a_random` - FNV-1a hash of key (NULL keys are randomly partitioned).",
          .sdef = "consistent_random",
          .validate = rd_kafka_conf_validate_partitioner },
	{ _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR,
	  _RKT(partitioner),
	  "Custom partitioner callback "
	  "(set with rd_kafka_topic_conf_set_partitioner_cb())" },
	{ _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED|_RK_EXPERIMENTAL,
          "msg_order_cmp", _RK_C_PTR,
	  _RKT(msg_order_cmp),
	  "Message queue ordering comparator "
	  "(set with rd_kafka_topic_conf_set_msg_order_cmp()). "
          "Also see `queuing.strategy`." },
	{ _RK_TOPIC, "opaque", _RK_C_PTR,
	  _RKT(opaque),
	  "Application opaque (set with rd_kafka_topic_conf_set_opaque())" },
        { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "compression.codec", _RK_C_S2I,
	  _RKT(compression_codec),
	  "Compression codec to use for compressing message sets. "
          "inherit = inherit global compression.codec configuration.",
	  .vdef = RD_KAFKA_COMPRESSION_INHERIT,
	  .s2i = {
		  { RD_KAFKA_COMPRESSION_NONE, "none" },
		  { RD_KAFKA_COMPRESSION_GZIP, "gzip",
                    _UNSUPPORTED_ZLIB },
		  { RD_KAFKA_COMPRESSION_SNAPPY, "snappy",
                    _UNSUPPORTED_SNAPPY },
		  { RD_KAFKA_COMPRESSION_LZ4, "lz4" },
		  { RD_KAFKA_COMPRESSION_ZSTD, "zstd",
                    _UNSUPPORTED_ZSTD },
		  { RD_KAFKA_COMPRESSION_INHERIT, "inherit" },
		  { 0 }
		}
        },
        { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "compression.type", _RK_C_ALIAS,
          .sdef = "compression.codec" },
        { _RK_TOPIC|_RK_PRODUCER|_RK_MED, "compression.level", _RK_C_INT,
	  _RKT(compression_level),
	  "Compression level parameter for algorithm selected by configuration "
	  "property `compression.codec`. Higher values will result in better "
	  "compression at the cost of more CPU usage. Usable range is "
	  "algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; "
	  "-1 = codec-dependent default compression level.",
	  RD_KAFKA_COMPLEVEL_MIN,
	  RD_KAFKA_COMPLEVEL_MAX,
	  RD_KAFKA_COMPLEVEL_DEFAULT },


        /* Topic consumer properties */
        { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "auto.commit.enable",
          _RK_C_BOOL,
	  _RKT(auto_commit),
	  "[**LEGACY PROPERTY:** This property is used by the simple legacy "
	  "consumer only. When using the high-level KafkaConsumer, the global "
	  "`enable.auto.commit` property must be used instead]. "
	  "If true, periodically commit offset of the last message handed "
	  "to the application. This committed offset will be used when the "
	  "process restarts to pick up where it left off. "
	  "If false, the application will have to call "
	  "`rd_kafka_offset_store()` to store an offset (optional). "
	  "**NOTE:** There is currently no zookeeper integration, offsets "
	  "will be written to broker or local file according to "
          "offset.store.method.",
	  0, 1, 1 },
	{ _RK_TOPIC|_RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS,
	  .sdef = "auto.commit.enable" },
        { _RK_TOPIC|_RK_CONSUMER|_RK_HIGH, "auto.commit.interval.ms",
          _RK_C_INT,
	  _RKT(auto_commit_interval_ms),
	  "[**LEGACY PROPERTY:** This setting is used by the simple legacy "
	  "consumer only. When using the high-level KafkaConsumer, the "
	  "global `auto.commit.interval.ms` property must be used instead]. "
	  "The frequency in milliseconds that the consumer offsets "
	  "are committed (written) to offset storage.",
	  10, 86400*1000, 60*1000 },
        { _RK_TOPIC|_RK_CONSUMER|_RK_HIGH, "auto.offset.reset", _RK_C_S2I,
	  _RKT(auto_offset_reset),
	  "Action to take when there is no initial offset in offset store "
	  "or the desired offset is out of range: "
	  "'smallest','earliest' - automatically reset the offset to the smallest offset, "
	  "'largest','latest' - automatically reset the offset to the largest offset, "
	  "'error' - trigger an error which is retrieved by consuming messages "
	  "and checking 'message->err'.",
	  .vdef = RD_KAFKA_OFFSET_END,
	  .s2i = {
			{ RD_KAFKA_OFFSET_BEGINNING, "smallest" },
			{ RD_KAFKA_OFFSET_BEGINNING, "earliest" },
			{ RD_KAFKA_OFFSET_BEGINNING, "beginning" },
			{ RD_KAFKA_OFFSET_END, "largest" },
			{ RD_KAFKA_OFFSET_END, "latest" },
			{ RD_KAFKA_OFFSET_END, "end" },
			{ RD_KAFKA_OFFSET_INVALID, "error" },
		}
	},
        { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.path",
          _RK_C_STR,
	  _RKT(offset_store_path),
	  "Path to local file for storing offsets. If the path is a directory "
	  "a filename will be automatically generated in that directory based "
	  "on the topic and partition. "
          "File-based offset storage will be removed in a future version.",
	  .sdef = "." },

        { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED,
          "offset.store.sync.interval.ms", _RK_C_INT,
	  _RKT(offset_store_sync_interval_ms),
	  "fsync() interval for the offset file, in milliseconds. "
	  "Use -1 to disable syncing, and 0 for immediate sync after "
	  "each write. "
          "File-based offset storage will be removed in a future version.",
	  -1, 86400*1000, -1 },

        { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.method",
          _RK_C_S2I,
          _RKT(offset_store_method),
          "Offset commit store method: "
          "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
          "'broker' - broker commit store "
          "(requires \"group.id\" to be configured and "
          "Apache Kafka 0.8.2 or later on the broker.).",
          .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
          .s2i = {
                        { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
                        { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
                }
        },

        { _RK_TOPIC|_RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT,
          _RKT(consume_callback_max_msgs),
          "Maximum number of messages to dispatch in "
          "one `rd_kafka_consume_callback*()` call (0 = unlimited)",
          0, 1000000, 0 },

	{ 0, /* End */ }
};

/**
 * @returns the property object for \p name in \p scope, or NULL if not found.
 * @remark does not work with interceptor configs.
 */
const struct rd_kafka_property *
rd_kafka_conf_prop_find (int scope, const char *name) {
        const struct rd_kafka_property *prop;

 restart:
        for (prop = rd_kafka_properties ; prop->name ; prop++) {

                if (!(prop->scope & scope))
                        continue;

                if (strcmp(prop->name, name))
                        continue;

                if (prop->type == _RK_C_ALIAS) {
                        /* Caller supplied an alias, restart
                         * search for real name. */
                        name = prop->sdef;
                        goto restart;
                }

                return prop;
        }

        return NULL;
}

/**
 * @returns rd_true if property has been set/modified, else rd_false.
 *          If \p name is unknown 0 is returned.
 */
rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf,
                                     const char *name) {
        const struct rd_kafka_property *prop;

        if (!(prop = rd_kafka_conf_prop_find(_RK_GLOBAL, name)))
            return rd_false;

        return rd_kafka_anyconf_is_modified(conf, prop);
}


/**
 * @returns true if property has been set/modified, else 0.
 *          If \p name is unknown 0 is returned.
 */
static
rd_bool_t rd_kafka_topic_conf_is_modified (const rd_kafka_topic_conf_t *conf,
                                           const char *name) {
        const struct rd_kafka_property *prop;

        if (!(prop = rd_kafka_conf_prop_find(_RK_TOPIC, name)))
            return 0;

        return rd_kafka_anyconf_is_modified(conf, prop);
}



static rd_kafka_conf_res_t
rd_kafka_anyconf_set_prop0 (int scope, void *conf,
			    const struct rd_kafka_property *prop,
			    const char *istr, int ival, rd_kafka_conf_set_mode_t set_mode,
                            char *errstr, size_t errstr_size) {
        rd_kafka_conf_res_t res;

#define _RK_PTR(TYPE,BASE,OFFSET)  (TYPE)(void *)(((char *)(BASE))+(OFFSET))

        /* Try interceptors first (only for GLOBAL config) */
        if (scope & _RK_GLOBAL) {
                if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL)
                        res = RD_KAFKA_CONF_UNKNOWN;
                else
                        res = rd_kafka_interceptors_on_conf_set(conf,
                                                                prop->name,
                                                                istr,
                                                                errstr,
                                                                errstr_size);
                if (res != RD_KAFKA_CONF_UNKNOWN)
                        return res;
        }


        if (prop->set) {
                /* Custom setter */

                res = prop->set(scope, conf, prop->name, istr,
                                _RK_PTR(void *, conf, prop->offset),
                                set_mode, errstr, errstr_size);

                if (res != RD_KAFKA_CONF_OK)
                        return res;

                /* FALLTHRU so that property value is set. */
        }

	switch (prop->type)
	{
	case _RK_C_STR:
	{
		char **str = _RK_PTR(char **, conf, prop->offset);
		if (*str)
			rd_free(*str);
		if (istr)
			*str = rd_strdup(istr);
		else
			*str = prop->sdef ? rd_strdup(prop->sdef) : NULL;
                break;
	}
        case _RK_C_KSTR:
        {
                rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
                                                 prop->offset);
                if (*kstr)
                        rd_kafkap_str_destroy(*kstr);
                if (istr)
                        *kstr = rd_kafkap_str_new(istr, -1);
                else
                        *kstr = prop->sdef ?
				rd_kafkap_str_new(prop->sdef, -1) : NULL;
                break;
        }
	case _RK_C_PTR:
		*_RK_PTR(const void **, conf, prop->offset) = istr;
                break;
	case _RK_C_BOOL:
	case _RK_C_INT:
	case _RK_C_S2I:
	case _RK_C_S2F:
	{
		int *val = _RK_PTR(int *, conf, prop->offset);

		if (prop->type == _RK_C_S2F) {
			switch (set_mode)
			{
			case _RK_CONF_PROP_SET_REPLACE:
				*val = ival;
				break;
			case _RK_CONF_PROP_SET_ADD:
				*val |= ival;
				break;
			case _RK_CONF_PROP_SET_DEL:
				*val &= ~ival;
				break;
			}
		} else {
			/* Single assignment */
			*val = ival;

		}
                break;
	}
        case _RK_C_DBL:
        {
                double *val = _RK_PTR(double *, conf, prop->offset);
                if (istr) {
                        char *endptr;
                        double new_val = strtod(istr, &endptr);
                        /* This is verified in set_prop() */
                        rd_assert(endptr != istr);
                        *val = new_val;
                } else
                        *val = prop->ddef;
                break;
        }

        case _RK_C_PATLIST:
        {
                /* Split comma-separated list into individual regex expressions
                 * that are verified and then append to the provided list. */
                rd_kafka_pattern_list_t **plist;

                plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);

		if (*plist)
			rd_kafka_pattern_list_destroy(*plist);

		if (istr) {
			if (!(*plist =
			      rd_kafka_pattern_list_new(istr,
							errstr,
							(int)errstr_size)))
				return RD_KAFKA_CONF_INVALID;
		} else
			*plist = NULL;

                break;
        }

        case _RK_C_INTERNAL:
                /* Probably handled by setter */
                break;

	default:
		rd_kafka_assert(NULL, !*"unknown conf type");
	}


        rd_kafka_anyconf_set_modified(conf, prop, 1/*modified*/);
        return RD_KAFKA_CONF_OK;
}


/**
 * @brief Find s2i (string-to-int mapping) entry and return its array index,
 *        or -1 on miss.
 */
static int rd_kafka_conf_s2i_find (const struct rd_kafka_property *prop,
				   const char *value) {
	int j;

	for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
		if (prop->s2i[j].str &&
		    !rd_strcasecmp(prop->s2i[j].str, value))
			return j;
	}

	return -1;
}


/**
 * @brief Set configuration property.
 *
 * @param allow_specific Allow rd_kafka_*conf_set_..() to be set,
 *        such as rd_kafka_conf_set_log_cb().
 *        Should not be allowed from the conf_set() string interface.
 */
static rd_kafka_conf_res_t
rd_kafka_anyconf_set_prop (int scope, void *conf,
			   const struct rd_kafka_property *prop,
			   const char *value,
                           int allow_specific,
			   char *errstr, size_t errstr_size) {
	int ival;

        if (prop->unsupported) {
                rd_snprintf(errstr, errstr_size,
                            "Configuration property \"%s\" not supported "
                            "in this build: %s",
                            prop->name, prop->unsupported);
                return RD_KAFKA_CONF_INVALID;
        }

	switch (prop->type)
	{
	case _RK_C_STR:
        case _RK_C_KSTR:
		if (prop->s2i[0].str) {
			int match;

			if (!value ||
			    (match = rd_kafka_conf_s2i_find(prop, value)) == -1){
				rd_snprintf(errstr, errstr_size,
					    "Invalid value for "
					    "configuration property \"%s\": "
					    "%s",
					    prop->name, value);
				return RD_KAFKA_CONF_INVALID;
			}

			/* Replace value string with canonical form */
			value = prop->s2i[match].str;
		}
		/* FALLTHRU */
        case _RK_C_PATLIST:
		if (prop->validate &&
		    (!value || !prop->validate(prop, value, -1))) {
			rd_snprintf(errstr, errstr_size,
				    "Invalid value for "
				    "configuration property \"%s\": %s",
				    prop->name, value);
			return RD_KAFKA_CONF_INVALID;
		}

		return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
						  _RK_CONF_PROP_SET_REPLACE,
                                                  errstr, errstr_size);

	case _RK_C_PTR:
                /* Allow hidden internal unit test properties to
                 * be set from generic conf_set() interface. */
                if (!allow_specific && !(prop->scope & _RK_HIDDEN)) {
                        rd_snprintf(errstr, errstr_size,
                                    "Property \"%s\" must be set through "
                                    "dedicated .._set_..() function",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }
                return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
                                                  _RK_CONF_PROP_SET_REPLACE,
                                                  errstr, errstr_size);

	case _RK_C_BOOL:
		if (!value) {
			rd_snprintf(errstr, errstr_size,
				 "Bool configuration property \"%s\" cannot "
				 "be set to empty value", prop->name);
			return RD_KAFKA_CONF_INVALID;
		}


		if (!rd_strcasecmp(value, "true") ||
		    !rd_strcasecmp(value, "t") ||
		    !strcmp(value, "1"))
			ival = 1;
		else if (!rd_strcasecmp(value, "false") ||
			 !rd_strcasecmp(value, "f") ||
			 !strcmp(value, "0"))
			ival = 0;
		else {
			rd_snprintf(errstr, errstr_size,
				 "Expected bool value for \"%s\": "
				 "true or false", prop->name);
			return RD_KAFKA_CONF_INVALID;
		}

		rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
					   _RK_CONF_PROP_SET_REPLACE,
                                           errstr, errstr_size);
		return RD_KAFKA_CONF_OK;

	case _RK_C_INT:
	{
		const char *end;

		if (!value) {
			rd_snprintf(errstr, errstr_size,
				 "Integer configuration "
				 "property \"%s\" cannot be set "
				 "to empty value", prop->name);
			return RD_KAFKA_CONF_INVALID;
		}

		ival = (int)strtol(value, (char **)&end, 0);
		if (end == value) {
			/* Non numeric, check s2i for string mapping */
			int match = rd_kafka_conf_s2i_find(prop, value);

			if (match == -1) {
				rd_snprintf(errstr, errstr_size,
					    "Invalid value for "
					    "configuration property \"%s\"",
					    prop->name);
				return RD_KAFKA_CONF_INVALID;
			}

                        if (prop->s2i[match].unsupported) {
                                rd_snprintf(errstr, errstr_size,
                                            "Unsupported value \"%s\" for "
                                            "configuration property \"%s\": %s",
                                            value, prop->name,
                                            prop->s2i[match].unsupported);
                                return RD_KAFKA_CONF_INVALID;
                        }

			ival = prop->s2i[match].val;
		}

		if (ival < prop->vmin ||
		    ival > prop->vmax) {
			rd_snprintf(errstr, errstr_size,
				 "Configuration property \"%s\" value "
				 "%i is outside allowed range %i..%i\n",
				 prop->name, ival,
				 prop->vmin,
				 prop->vmax);
			return RD_KAFKA_CONF_INVALID;
		}

		rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
					   _RK_CONF_PROP_SET_REPLACE,
                                           errstr, errstr_size);
		return RD_KAFKA_CONF_OK;
	}

        case _RK_C_DBL:
        {
                const char *end;
                double dval;

                if (!value) {
                        rd_snprintf(errstr, errstr_size,
                                 "Float configuration "
                                 "property \"%s\" cannot be set "
                                 "to empty value", prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }

                dval = strtod(value, (char **)&end);
                if (end == value) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value for "
                                    "configuration property \"%s\"",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }

                if (dval < prop->dmin ||
                    dval > prop->dmax) {
                        rd_snprintf(errstr, errstr_size,
                                 "Configuration property \"%s\" value "
                                 "%g is outside allowed range %g..%g\n",
                                 prop->name, dval,
                                 prop->dmin,
                                 prop->dmax);
                        return RD_KAFKA_CONF_INVALID;
                }

                rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
                                           _RK_CONF_PROP_SET_REPLACE,
                                           errstr, errstr_size);
                return RD_KAFKA_CONF_OK;
        }

	case _RK_C_S2I:
	case _RK_C_S2F:
	{
		int j;
		const char *next;

		if (!value) {
			rd_snprintf(errstr, errstr_size,
				 "Configuration "
				 "property \"%s\" cannot be set "
				 "to empty value", prop->name);
			return RD_KAFKA_CONF_INVALID;
		}

		next = value;
		while (next && *next) {
			const char *s, *t;
			rd_kafka_conf_set_mode_t set_mode = _RK_CONF_PROP_SET_ADD; /* S2F */

			s = next;

			if (prop->type == _RK_C_S2F &&
			    (t = strchr(s, ','))) {
				/* CSV flag field */
				next = t+1;
			} else {
				/* Single string */
				t = s+strlen(s);
				next = NULL;
			}


			/* Left trim */
			while (s < t && isspace((int)*s))
				s++;

			/* Right trim */
			while (t > s && isspace((int)*t))
				t--;

			/* S2F: +/- prefix */
			if (prop->type == _RK_C_S2F) {
				if (*s == '+') {
					set_mode = _RK_CONF_PROP_SET_ADD;
					s++;
				} else if (*s == '-') {
					set_mode = _RK_CONF_PROP_SET_DEL;
					s++;
				}
			}

			/* Empty string? */
			if (s == t)
				continue;

			/* Match string to s2i table entry */
			for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
				int new_val;

				if (!prop->s2i[j].str)
					continue;

				if (strlen(prop->s2i[j].str) == (size_t)(t-s) &&
					 !rd_strncasecmp(prop->s2i[j].str, s,
							 (int)(t-s)))
					new_val = prop->s2i[j].val;
				else
					continue;

                                if (prop->s2i[j].unsupported) {
                                        rd_snprintf(
                                                errstr, errstr_size,
                                                "Unsupported value \"%.*s\" "
                                                "for configuration property "
                                                "\"%s\": %s",
                                                (int)(t-s), s, prop->name,
                                                prop->s2i[j].unsupported);
                                        return RD_KAFKA_CONF_INVALID;
                                }

				rd_kafka_anyconf_set_prop0(scope, conf, prop,
                                                           value, new_val,
                                                           set_mode,
                                                           errstr, errstr_size);

				if (prop->type == _RK_C_S2F) {
					/* Flags: OR it in: do next */
					break;
				} else {
					/* Single assignment */
					return RD_KAFKA_CONF_OK;
				}
			}

			/* S2F: Good match: continue with next */
			if (j < (int)RD_ARRAYSIZE(prop->s2i))
				continue;

			/* No match */
			rd_snprintf(errstr, errstr_size,
                                "Invalid value \"%.*s\" for "
                                "configuration property \"%s\"",
                                (int)(t-s), s, prop->name);
			return RD_KAFKA_CONF_INVALID;

		}
		return RD_KAFKA_CONF_OK;
	}

        case _RK_C_INTERNAL:
                rd_snprintf(errstr, errstr_size,
                            "Internal property \"%s\" not settable",
                            prop->name);
                return RD_KAFKA_CONF_INVALID;

        case _RK_C_INVALID:
                rd_snprintf(errstr, errstr_size, "%s", prop->desc);
                return RD_KAFKA_CONF_INVALID;

	default:
                rd_kafka_assert(NULL, !*"unknown conf type");
	}

	/* not reachable */
	return RD_KAFKA_CONF_INVALID;
}



static void rd_kafka_defaultconf_set (int scope, void *conf) {
	const struct rd_kafka_property *prop;

	for (prop = rd_kafka_properties ; prop->name ; prop++) {
		if (!(prop->scope & scope))
			continue;

		if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
			continue;

                if (prop->ctor)
                        prop->ctor(scope, conf);

                if (prop->sdef || prop->vdef || prop->pdef ||
                    !rd_dbl_zero(prop->ddef))
			rd_kafka_anyconf_set_prop0(scope, conf, prop,
						   prop->sdef ?
                                                   prop->sdef : prop->pdef,
                                                   prop->vdef,
                                                   _RK_CONF_PROP_SET_REPLACE,
                                                   NULL, 0);
	}
}

rd_kafka_conf_t *rd_kafka_conf_new (void) {
	rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
        rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*conf) &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
	rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
        rd_kafka_anyconf_clear_all_is_modified(conf);
	return conf;
}

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) {
	rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
        rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*tconf) &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
	rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
        rd_kafka_anyconf_clear_all_is_modified(tconf);
	return tconf;
}


static int rd_kafka_anyconf_set (int scope, void *conf,
                                 const char *name, const char *value,
                                 char *errstr, size_t errstr_size) {
	char estmp[1];
	const struct rd_kafka_property *prop;
        rd_kafka_conf_res_t res;

	if (!errstr) {
		errstr = estmp;
		errstr_size = 0;
	}

	if (value && !*value)
		value = NULL;

        /* Try interceptors first (only for GLOBAL config for now) */
        if (scope & _RK_GLOBAL) {
                res = rd_kafka_interceptors_on_conf_set(
                        (rd_kafka_conf_t *)conf, name, value,
                        errstr, errstr_size);
                /* Handled (successfully or not) by interceptor. */
                if (res != RD_KAFKA_CONF_UNKNOWN)
                        return res;
        }

        /* Then global config */


	for (prop = rd_kafka_properties ; prop->name ; prop++) {

		if (!(prop->scope & scope))
			continue;

		if (strcmp(prop->name, name))
			continue;

		if (prop->type == _RK_C_ALIAS)
			return rd_kafka_anyconf_set(scope, conf,
						    prop->sdef, value,
						    errstr, errstr_size);

                return rd_kafka_anyconf_set_prop(scope, conf, prop, value,
                                                 0/*don't allow specifics*/,
                                                 errstr, errstr_size);
	}

	rd_snprintf(errstr, errstr_size,
		 "No such configuration property: \"%s\"", name);

	return RD_KAFKA_CONF_UNKNOWN;
}


/**
 * @brief Set a rd_kafka_*_conf_set_...() specific property, such as
 *        rd_kafka_conf_set_error_cb().
 *
 * @warning Will not call interceptor's on_conf_set.
 * @warning Asserts if \p name is not known or value is incorrect.
 *
 * Implemented as a macro to have rd_assert() print the original function.
 */

#define rd_kafka_anyconf_set_internal(SCOPE,CONF,NAME,VALUE) do {       \
        const struct rd_kafka_property *_prop;                          \
        rd_kafka_conf_res_t _res;                                       \
        _prop = rd_kafka_conf_prop_find(SCOPE, NAME);                   \
        rd_assert(_prop && *"invalid property name");                   \
        _res = rd_kafka_anyconf_set_prop(SCOPE, CONF, _prop,            \
                                         (const void *)VALUE,           \
                                         1/*allow-specifics*/,          \
                                         NULL, 0);                      \
        rd_assert(_res == RD_KAFKA_CONF_OK);                            \
        } while (0)


rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
                                       const char *name,
                                       const char *value,
                                       char *errstr, size_t errstr_size) {
        rd_kafka_conf_res_t res;

        res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value,
                                   errstr, errstr_size);
        if (res != RD_KAFKA_CONF_UNKNOWN)
                return res;

        /* Fallthru:
         * If the global property was unknown, try setting it on the
         * default topic config. */
        if (!conf->topic_conf) {
                /* Create topic config, might be over-written by application
                 * later. */
                rd_kafka_conf_set_default_topic_conf(conf,
                                                     rd_kafka_topic_conf_new());
        }

        return rd_kafka_topic_conf_set(conf->topic_conf, name, value,
                                       errstr, errstr_size);
}


rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
					     const char *name,
					     const char *value,
					     char *errstr, size_t errstr_size) {
	if (!strncmp(name, "topic.", strlen("topic.")))
		name += strlen("topic.");

	return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value,
				    errstr, errstr_size);
}


/**
 * @brief Overwrites the contents of \p str up until but not including
 *        the nul-term.
 */
void rd_kafka_desensitize_str (char *str) {
        size_t len;
        static const char redacted[] = "(REDACTED)";

#ifdef _WIN32
        len = strlen(str);
        SecureZeroMemory(str, len);
#else
        volatile char *volatile s;

        for (s = str ; *s ; s++)
                *s = '\0';

        len = (size_t)(s - str);
#endif

        if (len > sizeof(redacted))
                memcpy(str, redacted, sizeof(redacted));
}




/**
 * @brief Overwrite the value of \p prop, if sensitive.
 */
static RD_INLINE void
rd_kafka_anyconf_prop_desensitize (int scope, void *conf,
                                   const struct rd_kafka_property *prop) {
        if (likely(!(prop->scope & _RK_SENSITIVE)))
                return;

        switch (prop->type)
        {
        case _RK_C_STR:
        {
                char **str = _RK_PTR(char **, conf, prop->offset);
                if (*str)
                        rd_kafka_desensitize_str(*str);
                break;
        }

        case _RK_C_INTERNAL:
                /* This is typically a pointer to something, the
                 * _RK_SENSITIVE flag is set to get it redacted in
                 * ..dump_dbg(), but we don't have to desensitize
                 * anything here. */
                break;

        default:
                rd_assert(!*"BUG: Don't know how to desensitize prop type");
                break;
        }
}


/**
 * @brief Desensitize all sensitive properties in \p conf
 */
static void rd_kafka_anyconf_desensitize (int scope, void *conf) {
        const struct rd_kafka_property *prop;

        for (prop = rd_kafka_properties; prop->name ; prop++) {
                if (!(prop->scope & scope))
                        continue;

                rd_kafka_anyconf_prop_desensitize(scope, conf, prop);
        }
}

/**
 * @brief Overwrite the values of sensitive properties
 */
void rd_kafka_conf_desensitize (rd_kafka_conf_t *conf) {
        if (conf->topic_conf)
                rd_kafka_anyconf_desensitize(_RK_TOPIC,
                                             conf->topic_conf);
        rd_kafka_anyconf_desensitize(_RK_GLOBAL, conf);
}

/**
 * @brief Overwrite the values of sensitive properties
 */
void rd_kafka_topic_conf_desensitize (rd_kafka_topic_conf_t *tconf) {
        rd_kafka_anyconf_desensitize(_RK_TOPIC, tconf);
}


static void rd_kafka_anyconf_clear (int scope, void *conf,
				    const struct rd_kafka_property *prop) {

        rd_kafka_anyconf_prop_desensitize(scope, conf, prop);

	switch (prop->type)
	{
	case _RK_C_STR:
	{
		char **str = _RK_PTR(char **, conf, prop->offset);

		if (*str) {
                        if (prop->set) {
                                prop->set(scope, conf, prop->name, NULL, *str,
                                          _RK_CONF_PROP_SET_DEL, NULL, 0);
                                /* FALLTHRU */
                        }
                        rd_free(*str);
			*str = NULL;
		}
	}
	break;

        case _RK_C_KSTR:
        {
                rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
                                                 prop->offset);
                if (*kstr) {
                        rd_kafkap_str_destroy(*kstr);
                        *kstr = NULL;
                }
        }
        break;

        case _RK_C_PATLIST:
        {
                rd_kafka_pattern_list_t **plist;
                plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
		if (*plist) {
			rd_kafka_pattern_list_destroy(*plist);
			*plist = NULL;
		}
        }
        break;

        case _RK_C_PTR:
                if (_RK_PTR(void *, conf, prop->offset) != NULL) {
                        if (!strcmp(prop->name, "default_topic_conf")) {
                                rd_kafka_topic_conf_t **tconf;

                                tconf = _RK_PTR(rd_kafka_topic_conf_t **,
                                                conf, prop->offset);
                                if (*tconf) {
                                        rd_kafka_topic_conf_destroy(*tconf);
                                        *tconf = NULL;
                                }
                        }
                }
                break;

	default:
		break;
	}

        if (prop->dtor)
                prop->dtor(scope, conf);

}

void rd_kafka_anyconf_destroy (int scope, void *conf) {
	const struct rd_kafka_property *prop;

        /* Call on_conf_destroy() interceptors */
        if (scope == _RK_GLOBAL)
                rd_kafka_interceptors_on_conf_destroy(conf);

	for (prop = rd_kafka_properties; prop->name ; prop++) {
		if (!(prop->scope & scope))
			continue;

		rd_kafka_anyconf_clear(scope, conf, prop);
	}
}


void rd_kafka_conf_destroy (rd_kafka_conf_t *conf) {
	rd_kafka_anyconf_destroy(_RK_GLOBAL, conf);
        //FIXME: partition_assignors
	rd_free(conf);
}

void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf) {
	rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf);
	rd_free(topic_conf);
}



static void rd_kafka_anyconf_copy (int scope, void *dst, const void *src,
                                   size_t filter_cnt, const char **filter) {
	const struct rd_kafka_property *prop;

	for (prop = rd_kafka_properties ; prop->name ; prop++) {
		const char *val = NULL;
		int ival = 0;
                char *valstr;
                size_t valsz;
                size_t fi;
                size_t nlen;

		if (!(prop->scope & scope))
			continue;

		if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
			continue;

                /* Skip properties that have not been set,
                 * unless it is an internal one which requires
                 * extra logic, such as the interceptors. */
                if (!rd_kafka_anyconf_is_modified(src, prop) &&
                    prop->type != _RK_C_INTERNAL)
                        continue;

                /* Apply filter, if any. */
                nlen = strlen(prop->name);
                for (fi = 0 ; fi < filter_cnt ; fi++) {
                        size_t flen = strlen(filter[fi]);
                        if (nlen >= flen &&
                            !strncmp(filter[fi], prop->name, flen))
                                break;
                }
                if (fi < filter_cnt)
                        continue; /* Filter matched */

		switch (prop->type)
		{
		case _RK_C_STR:
		case _RK_C_PTR:
			val = *_RK_PTR(const char **, src, prop->offset);

                        if (!strcmp(prop->name, "default_topic_conf") && val)
                                val = (void *)rd_kafka_topic_conf_dup(
                                        (const rd_kafka_topic_conf_t *)
                                        (void *)val);
			break;
                case _RK_C_KSTR:
                {
                        rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **,
                                                         src, prop->offset);
                        if (*kstr)
                                val = (*kstr)->str;
                        break;
                }

		case _RK_C_BOOL:
		case _RK_C_INT:
		case _RK_C_S2I:
		case _RK_C_S2F:
			ival = *_RK_PTR(const int *, src, prop->offset);

                        /* Get string representation of configuration value. */
                        valsz = 0;
                        rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
                        valstr = rd_alloca(valsz);
                        rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
                        val = valstr;
			break;
                case _RK_C_DBL:
                        /* Get string representation of configuration value. */
                        valsz = 0;
                        rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
                        valstr = rd_alloca(valsz);
                        rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
                        val = valstr;
                        break;
                case _RK_C_PATLIST:
                {
                        const rd_kafka_pattern_list_t **plist;
                        plist = _RK_PTR(const rd_kafka_pattern_list_t **,
                                        src, prop->offset);
			if (*plist)
				val = (*plist)->rkpl_orig;
                        break;
                }
                case _RK_C_INTERNAL:
                        /* Handled by ->copy() below. */
                        break;
		default:
			continue;
		}

                if (prop->copy)
                        prop->copy(scope, dst, src,
                                   _RK_PTR(void *, dst, prop->offset),
                                   _RK_PTR(const void *, src, prop->offset),
                                   filter_cnt, filter);

                rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival,
                                           _RK_CONF_PROP_SET_REPLACE, NULL, 0);
	}
}


rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf) {
	rd_kafka_conf_t *new = rd_kafka_conf_new();

        rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL);

        rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL);

	return new;
}

rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf,
                                           size_t filter_cnt,
                                           const char **filter) {
	rd_kafka_conf_t *new = rd_kafka_conf_new();

        rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter);

        rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter);

	return new;
}


rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t
						*conf) {
	rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new();

	rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL);

	return new;
}

rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup (rd_kafka_t *rk) {
        if (rk->rk_conf.topic_conf)
                return rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
        else
                return rd_kafka_topic_conf_new();
}

void rd_kafka_conf_set_events (rd_kafka_conf_t *conf, int events) {
        char tmp[32];
        rd_snprintf(tmp, sizeof(tmp), "%d", events);
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "enabled_events", tmp);
}

void
rd_kafka_conf_set_background_event_cb (rd_kafka_conf_t *conf,
                                       void (*event_cb) (rd_kafka_t *rk,
                                                         rd_kafka_event_t *rkev,
                                                         void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "background_event_cb",
                                      event_cb);
}


void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf,
			      void (*dr_cb) (rd_kafka_t *rk,
					     void *payload, size_t len,
					     rd_kafka_resp_err_t err,
					     void *opaque, void *msg_opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_cb", dr_cb);
}


void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,
                                  void (*dr_msg_cb) (rd_kafka_t *rk,
                                                     const rd_kafka_message_t *
                                                     rkmessage,
                                                     void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_msg_cb", dr_msg_cb);
}


void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
                                   void (*consume_cb) (rd_kafka_message_t *
                                                       rkmessage,
                                                       void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "consume_cb",
                                      consume_cb);
}

void rd_kafka_conf_set_rebalance_cb (
        rd_kafka_conf_t *conf,
        void (*rebalance_cb) (rd_kafka_t *rk,
                              rd_kafka_resp_err_t err,
                              rd_kafka_topic_partition_list_t *partitions,
                              void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "rebalance_cb",
                                      rebalance_cb);
}

void rd_kafka_conf_set_offset_commit_cb (
        rd_kafka_conf_t *conf,
        void (*offset_commit_cb) (rd_kafka_t *rk,
                                  rd_kafka_resp_err_t err,
                                  rd_kafka_topic_partition_list_t *offsets,
                                  void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "offset_commit_cb",
                                      offset_commit_cb);
}



void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf,
				 void  (*error_cb) (rd_kafka_t *rk, int err,
						    const char *reason,
						    void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "error_cb", error_cb);
}


void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
				    void (*throttle_cb) (
					    rd_kafka_t *rk,
					    const char *broker_name,
					    int32_t broker_id,
					    int throttle_time_ms,
					    void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "throttle_cb",
                                      throttle_cb);
}


void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf,
			  void (*log_cb) (const rd_kafka_t *rk, int level,
                                          const char *fac, const char *buf)) {
#if !WITH_SYSLOG
        if (log_cb == rd_kafka_log_syslog)
                rd_assert(!*"syslog support not enabled in this build");
#endif
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "log_cb", log_cb);
}


void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf,
				 int (*stats_cb) (rd_kafka_t *rk,
						  char *json,
						  size_t json_len,
						  void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "stats_cb", stats_cb);
}

void rd_kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_conf_t *conf,
                void (*oauthbearer_token_refresh_cb) (
                        rd_kafka_t *rk,
                        const char *oauthbearer_config,
                        void *opaque)) {
#if WITH_SASL_OAUTHBEARER
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf,
                "oauthbearer_token_refresh_cb", oauthbearer_token_refresh_cb);
#endif
}

void rd_kafka_conf_set_socket_cb (rd_kafka_conf_t *conf,
                                  int (*socket_cb) (int domain, int type,
                                                    int protocol,
                                                    void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "socket_cb",
                                      socket_cb);
}

void
rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
                              int (*connect_cb) (int sockfd,
                                                 const struct sockaddr *addr,
                                                 int addrlen,
                                                 const char *id,
                                                 void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "connect_cb",
                                      connect_cb);
}

void
rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
                                  int (*closesocket_cb) (int sockfd,
                                                         void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "closesocket_cb",
                                      closesocket_cb);
}



#ifndef _WIN32
void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
                                int (*open_cb) (const char *pathname,
                                                int flags, mode_t mode,
                                                void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "open_cb", open_cb);
}
#endif


rd_kafka_conf_res_t
rd_kafka_conf_set_ssl_cert_verify_cb (
        rd_kafka_conf_t *conf,
        int (*ssl_cert_verify_cb) (rd_kafka_t *rk,
                                   const char *broker_name,
                                   int32_t broker_id,
                                   int *x509_set_error,
                                   int depth,
                                   const char *buf, size_t size,
                                   char *errstr, size_t errstr_size,
                                   void *opaque)) {
#if !WITH_SSL
        return RD_KAFKA_CONF_INVALID;
#else
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf,
                                      "ssl.certificate.verify_cb",
                                      ssl_cert_verify_cb);
        return RD_KAFKA_CONF_OK;
#endif
}


void rd_kafka_conf_set_opaque (rd_kafka_conf_t *conf, void *opaque) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "opaque", opaque);
}


void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
                                           rd_kafka_topic_conf_t *tconf) {
        if (conf->topic_conf)
                rd_kafka_topic_conf_destroy(conf->topic_conf);

        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "default_topic_conf",
                                      tconf);
}

rd_kafka_topic_conf_t *
rd_kafka_conf_get_default_topic_conf (rd_kafka_conf_t *conf) {
        return conf->topic_conf;
}


void
rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
					int32_t (*partitioner) (
						const rd_kafka_topic_t *rkt,
						const void *keydata,
						size_t keylen,
						int32_t partition_cnt,
						void *rkt_opaque,
						void *msg_opaque)) {
        rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "partitioner_cb",
                                      partitioner);
}

void
rd_kafka_topic_conf_set_msg_order_cmp (rd_kafka_topic_conf_t *topic_conf,
                                       int (*msg_order_cmp) (
                                               const rd_kafka_message_t *a,
                                               const rd_kafka_message_t *b)) {
        rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "msg_order_cmp",
                                      msg_order_cmp);
}

void rd_kafka_topic_conf_set_opaque (rd_kafka_topic_conf_t *topic_conf,
				     void *opaque) {
        rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "opaque", opaque);
}




/**
 * @brief Convert flags \p ival to csv-string using S2F property \p prop.
 *
 * This function has two modes: size query and write.
 * To query for needed size call with dest==NULL,
 * to write to buffer of size dest_size call with dest!=NULL.
 *
 * An \p ival of -1 means all.
 *
 * @param include_unsupported Include flag values that are unsupported
 *                            due to missing dependencies at build time.
 *
 * @returns the number of bytes written to \p dest (if not NULL), else the
 *          total number of bytes needed.
 *
 */
static
size_t rd_kafka_conf_flags2str (char *dest, size_t dest_size, const char *delim,
				const struct rd_kafka_property *prop,
				int ival,
                                rd_bool_t include_unsupported) {
	size_t of = 0;
	int j;

	if (dest && dest_size > 0)
		*dest = '\0';

	/* Phase 1: scan for set flags, accumulate needed size.
	 * Phase 2: write to dest */
	for (j = 0 ; prop->s2i[j].str ; j++) {
		if (prop->type == _RK_C_S2F && ival != -1 &&
		    (ival & prop->s2i[j].val) != prop->s2i[j].val)
			continue;
		else if (prop->type == _RK_C_S2I &&
			   ival != -1 && prop->s2i[j].val != ival)
			continue;
                else if (prop->s2i[j].unsupported && !include_unsupported)
                        continue;

		if (!dest)
			of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0);
		else {
			size_t r;
			r = rd_snprintf(dest+of, dest_size-of,
					"%s%s",
					of > 0 ? delim:"",
					prop->s2i[j].str);
			if (r > dest_size-of) {
				r = dest_size-of;
				break;
			}
			of += r;
		}
	}

	return of+1/*nul*/;
}


/**
 * Return "original"(re-created) configuration value string
 */
static rd_kafka_conf_res_t
rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
                       char *dest, size_t *dest_size) {
        char tmp[22];
        const char *val = NULL;
        size_t val_len = 0;
        int j;

        switch (prop->type)
        {
        case _RK_C_STR:
                val = *_RK_PTR(const char **, conf, prop->offset);
                break;

        case _RK_C_KSTR:
        {
                const rd_kafkap_str_t **kstr = _RK_PTR(const rd_kafkap_str_t **,
                                                       conf, prop->offset);
                if (*kstr)
                        val = (*kstr)->str;
                break;
        }

        case _RK_C_PTR:
                val = *_RK_PTR(const void **, conf, prop->offset);
                if (val) {
                        rd_snprintf(tmp, sizeof(tmp), "%p", (void *)val);
                        val = tmp;
                }
                break;

        case _RK_C_BOOL:
                val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false");
                break;

        case _RK_C_INT:
                rd_snprintf(tmp, sizeof(tmp), "%i",
                            *_RK_PTR(int *, conf, prop->offset));
                val = tmp;
                break;

        case _RK_C_DBL:
                rd_snprintf(tmp, sizeof(tmp), "%g",
                            *_RK_PTR(double *, conf, prop->offset));
                val = tmp;
                break;

        case _RK_C_S2I:
                for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
                        if (prop->s2i[j].val ==
                            *_RK_PTR(int *, conf, prop->offset)) {
                                val = prop->s2i[j].str;
                                break;
                        }
                }
                break;

        case _RK_C_S2F:
        {
                const int ival = *_RK_PTR(const int *, conf, prop->offset);

		val_len = rd_kafka_conf_flags2str(dest,
                                                  dest ? *dest_size : 0, ",",
						  prop, ival,
                                                  rd_false/*only supported*/);
		if (dest) {
			val_len = 0;
			val = dest;
			dest = NULL;
		}
		break;
	}

        case _RK_C_PATLIST:
        {
                const rd_kafka_pattern_list_t **plist;
                plist = _RK_PTR(const rd_kafka_pattern_list_t **,
                                conf, prop->offset);
		if (*plist)
			val = (*plist)->rkpl_orig;
                break;
        }

        default:
                break;
        }

        if (val_len) {
                *dest_size = val_len+1;
                return RD_KAFKA_CONF_OK;
        }

        if (!val)
                return RD_KAFKA_CONF_INVALID;

        val_len = strlen(val);

        if (dest) {
                size_t use_len = RD_MIN(val_len, (*dest_size)-1);
                memcpy(dest, val, use_len);
                dest[use_len] = '\0';
        }

        /* Return needed size */
        *dest_size = val_len+1;

        return RD_KAFKA_CONF_OK;
}


static rd_kafka_conf_res_t rd_kafka_anyconf_get (int scope, const void *conf,
                                                 const char *name,
                                                 char *dest, size_t *dest_size){
	const struct rd_kafka_property *prop;

	for (prop = rd_kafka_properties; prop->name ; prop++) {

		if (!(prop->scope & scope) || strcmp(prop->name, name))
			continue;

		if (prop->type == _RK_C_ALIAS)
			return rd_kafka_anyconf_get(scope, conf,
						    prop->sdef,
						    dest, dest_size);

                if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) ==
                    RD_KAFKA_CONF_OK)
                        return RD_KAFKA_CONF_OK;
        }

        return RD_KAFKA_CONF_UNKNOWN;
}

rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
                                             const char *name,
                                             char *dest, size_t *dest_size) {
        return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size);
}

rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
                                       const char *name,
                                       char *dest, size_t *dest_size) {
        rd_kafka_conf_res_t res;
        res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size);
        if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf)
                return res;

        /* Fallthru:
         * If the global property was unknown, try getting it from the
         * default topic config, if any. */
        return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size);
}


static const char **rd_kafka_anyconf_dump (int scope, const void *conf,
                                           size_t *cntp,
                                           rd_bool_t only_modified,
                                           rd_bool_t redact_sensitive) {
	const struct rd_kafka_property *prop;
	char **arr;
	int cnt = 0;

	arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties)*2);

	for (prop = rd_kafka_properties; prop->name ; prop++) {
                char *val = NULL;
                size_t val_size;

		if (!(prop->scope & scope))
			continue;

                if (only_modified && !rd_kafka_anyconf_is_modified(conf, prop))
                        continue;

		/* Skip aliases, show original property instead.
                 * Skip invalids. */
		if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
			continue;

                if (redact_sensitive && (prop->scope & _RK_SENSITIVE)) {
                        val = rd_strdup("[redacted]");
                } else {
                        /* Query value size */
                        if (rd_kafka_anyconf_get0(conf, prop, NULL,
                                                  &val_size) !=
                            RD_KAFKA_CONF_OK)
                                continue;

                        /* Get value */
                        val = rd_malloc(val_size);
                        rd_kafka_anyconf_get0(conf, prop, val, &val_size);
                }

                arr[cnt++] = rd_strdup(prop->name);
                arr[cnt++] = val;
	}

	*cntp = cnt;

	return (const char **)arr;
}


const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp) {
	return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp,
                                     rd_false/*all*/,
                                     rd_false/*don't redact*/);
}

const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf,
				       size_t *cntp) {
	return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp,
                                     rd_false/*all*/,
                                     rd_false/*don't redact*/);
}

void rd_kafka_conf_dump_free (const char **arr, size_t cnt) {
	char **_arr = (char **)arr;
	unsigned int i;

	for (i = 0 ; i < cnt ; i++)
		if (_arr[i])
			rd_free(_arr[i]);

	rd_free(_arr);
}



/**
 * @brief Dump configured properties to debug log.
 */
void rd_kafka_anyconf_dump_dbg (rd_kafka_t *rk, int scope, const void *conf,
                                const char *description) {
        const char **arr;
        size_t cnt;
        size_t i;

        arr = rd_kafka_anyconf_dump(scope, conf, &cnt,
                                    rd_true/*modified only*/,
                                    rd_true/*redact sensitive*/);
        if (cnt > 0)
                rd_kafka_dbg(rk, CONF, "CONF", "%s:", description);
        for (i = 0 ; i < cnt ; i += 2)
                rd_kafka_dbg(rk, CONF, "CONF", "  %s = %s", arr[i], arr[i+1]);

        rd_kafka_conf_dump_free(arr, cnt);
}

void rd_kafka_conf_properties_show (FILE *fp) {
	const struct rd_kafka_property *prop0;
	int last = 0;
	int j;
	char tmp[512];
	const char *dash80 = "----------------------------------------"
		"----------------------------------------";

	for (prop0 = rd_kafka_properties; prop0->name ; prop0++) {
		const char *typeinfo = "";
                const char *importance;
                const struct rd_kafka_property *prop = prop0;

                /* Skip hidden properties. */
                if (prop->scope & _RK_HIDDEN)
                        continue;

                /* Skip invalid properties. */
                if (prop->type == _RK_C_INVALID)
                        continue;

		if (!(prop->scope & last)) {
			fprintf(fp,
				"%s## %s configuration properties\n\n",
				last ? "\n\n":"",
				prop->scope == _RK_GLOBAL ? "Global": "Topic");

			fprintf(fp,
				"%-40s | %3s | %-15s | %13s | %-10s | %-25s\n"
				"%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s-| -%.*s\n",
				"Property", "C/P", "Range",
				"Default", "Importance", "Description",
				40, dash80, 3, dash80, 15, dash80,
				13, dash80, 10, dash80, 25, dash80);

			last = prop->scope & (_RK_GLOBAL|_RK_TOPIC);

		}

		fprintf(fp, "%-40s | ", prop->name);

                /* For aliases, use the aliased property from here on
                 * so that the alias property shows up with proper
                 * ranges, defaults, etc. */
                if (prop->type == _RK_C_ALIAS) {
                        prop = rd_kafka_conf_prop_find(prop->scope,
                                                       prop->sdef);
                        rd_assert(prop && *"BUG: "
                                  "alias points to unknown config property");
                }

                fprintf(fp, "%3s | ",
                        (!(prop->scope & _RK_PRODUCER) ==
                         !(prop->scope & _RK_CONSUMER) ? " * " :
                         ((prop->scope & _RK_PRODUCER) ? " P " : " C ")));

		switch (prop->type)
		{
		case _RK_C_STR:
                case _RK_C_KSTR:
			typeinfo = "string";
                case _RK_C_PATLIST:
			if (prop->type == _RK_C_PATLIST)
				typeinfo = "pattern list";
			if (prop->s2i[0].str) {
				rd_kafka_conf_flags2str(
                                        tmp, sizeof(tmp), ", ",
                                        prop, -1,
                                        rd_true/*include unsupported*/);
				fprintf(fp, "%-15s | %13s",
					tmp, prop->sdef ? prop->sdef : "");
			} else {
				fprintf(fp, "%-15s | %13s",
					"", prop->sdef ? prop->sdef : "");
			}
			break;
		case _RK_C_BOOL:
			typeinfo = "boolean";
			fprintf(fp, "%-15s | %13s", "true, false",
				prop->vdef ? "true" : "false");
			break;
		case _RK_C_INT:
			typeinfo = "integer";
			rd_snprintf(tmp, sizeof(tmp),
				    "%d .. %d", prop->vmin, prop->vmax);
			fprintf(fp, "%-15s | %13i", tmp, prop->vdef);
			break;
                case _RK_C_DBL:
                        typeinfo = "float"; /* more user-friendly than double */
                        rd_snprintf(tmp, sizeof(tmp),
                                    "%g .. %g", prop->dmin, prop->dmax);
                        fprintf(fp, "%-15s | %13g", tmp, prop->ddef);
                        break;
		case _RK_C_S2I:
			typeinfo = "enum value";
			rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
						prop, -1,
                                                rd_true/*include unsupported*/);
			fprintf(fp, "%-15s | ", tmp);

			for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
				if (prop->s2i[j].val == prop->vdef) {
					fprintf(fp, "%13s", prop->s2i[j].str);
					break;
				}
			}
			if (j == RD_ARRAYSIZE(prop->s2i))
				fprintf(fp, "%13s", " ");
			break;

		case _RK_C_S2F:
			typeinfo = "CSV flags";
			/* Dont duplicate builtin.features value in
			 * both Range and Default */
			if (!strcmp(prop->name, "builtin.features"))
				*tmp = '\0';
			else
				rd_kafka_conf_flags2str(
                                        tmp, sizeof(tmp), ", ",
                                        prop, -1,
                                        rd_true/*include unsupported*/);
			fprintf(fp, "%-15s | ", tmp);
			rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
						prop, prop->vdef,
                                                rd_true/*include unsupported*/);
			fprintf(fp, "%13s", tmp);

			break;
		case _RK_C_PTR:
                case _RK_C_INTERNAL:
                        typeinfo = "see dedicated API";
                        /* FALLTHRU */
		default:
			fprintf(fp, "%-15s | %-13s", "", " ");
			break;
		}

                if (prop->scope & _RK_HIGH)
                        importance = "high";
                else if (prop->scope & _RK_MED)
                        importance = "medium";
                else
                        importance = "low";

                fprintf(fp, " | %-10s | ", importance);

                if (prop->scope & _RK_EXPERIMENTAL)
                        fprintf(fp, "**EXPERIMENTAL**: "
                                "subject to change or removal. ");

                if (prop->scope & _RK_DEPRECATED)
                        fprintf(fp, "**DEPRECATED** ");

                /* If the original property is an alias, prefix the
                 * description saying so. */
                if (prop0->type == _RK_C_ALIAS)
                        fprintf(fp, "Alias for `%s`: ", prop0->sdef);

                fprintf(fp, "%s <br>*Type: %s*\n", prop->desc,
                        typeinfo);
        }
        fprintf(fp, "\n");
        fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n");
}




/**
 * @name Configuration value methods
 *
 * @remark This generic interface will eventually replace the config property
 *         used above.
 * @{
 */


/**
 * @brief Set up an INT confval.
 *
 * @oaram name Property name, must be a const static string (will not be copied)
 */
void rd_kafka_confval_init_int (rd_kafka_confval_t *confval,
                                const char *name,
                                int vmin, int vmax, int vdef) {
        confval->name = name;
        confval->is_enabled = 1;
        confval->valuetype = RD_KAFKA_CONFVAL_INT;
        confval->u.INT.vmin = vmin;
        confval->u.INT.vmax = vmax;
        confval->u.INT.vdef = vdef;
        confval->u.INT.v    = vdef;
}

/**
 * @brief Set up a PTR confval.
 *
 * @oaram name Property name, must be a const static string (will not be copied)
 */
void rd_kafka_confval_init_ptr (rd_kafka_confval_t *confval,
                                const char *name) {
        confval->name = name;
        confval->is_enabled = 1;
        confval->valuetype = RD_KAFKA_CONFVAL_PTR;
        confval->u.PTR = NULL;
}

/**
 * @brief Set up but disable an intval, attempt to set this confval will fail.
 *
 * @oaram name Property name, must be a const static string (will not be copied)
 */
void rd_kafka_confval_disable (rd_kafka_confval_t *confval, const char *name) {
        confval->name = name;
        confval->is_enabled = 0;
}

/**
 * @brief Set confval's value to \p valuep, verifying the passed
 *        \p valuetype matches (or can be cast to) \p confval's type.
 *
 * @param dispname is the display name for the configuration value and is
 *        included in error strings.
 * @param valuep is a pointer to the value, or NULL to revert to default.
 *
 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the new value was set, or
 *          RD_KAFKA_RESP_ERR__INVALID_ARG if the value was of incorrect type,
 *          out of range, or otherwise not a valid value.
 */
rd_kafka_resp_err_t
rd_kafka_confval_set_type (rd_kafka_confval_t *confval,
                           rd_kafka_confval_type_t valuetype,
                           const void *valuep,
                           char *errstr, size_t errstr_size) {

        if (!confval->is_enabled) {
                rd_snprintf(errstr, errstr_size,
                            "\"%s\" is not supported for this operation",
                            confval->name);
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
        }

        switch (confval->valuetype)
        {
        case RD_KAFKA_CONFVAL_INT:
        {
                int v;
                const char *end;

                if (!valuep) {
                        /* Revert to default */
                        confval->u.INT.v = confval->u.INT.vdef;
                        confval->is_set = 0;
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                }

                switch (valuetype)
                {
                case RD_KAFKA_CONFVAL_INT:
                        v = *(const int *)valuep;
                        break;
                case RD_KAFKA_CONFVAL_STR:
                        v = (int)strtol((const char *)valuep, (char **)&end, 0);
                        if (end == (const char *)valuep) {
                                rd_snprintf(errstr, errstr_size,
                                            "Invalid value type for \"%s\": "
                                            "expecting integer",
                                            confval->name);
                                return RD_KAFKA_RESP_ERR__INVALID_TYPE;
                        }
                        break;
                default:
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value type for \"%s\": "
                                    "expecting integer", confval->name);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }


                if ((confval->u.INT.vmin || confval->u.INT.vmax) &&
                    (v < confval->u.INT.vmin || v > confval->u.INT.vmax)) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value type for \"%s\": "
                                    "expecting integer in range %d..%d",
                                    confval->name,
                                    confval->u.INT.vmin,
                                    confval->u.INT.vmax);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }

                confval->u.INT.v = v;
                confval->is_set = 1;
        }
        break;

        case RD_KAFKA_CONFVAL_STR:
        {
                size_t vlen;
                const char *v = (const char *)valuep;

                if (!valuep) {
                        confval->is_set = 0;
                        if (confval->u.STR.vdef)
                                confval->u.STR.v = rd_strdup(confval->u.STR.
                                                             vdef);
                        else
                                confval->u.STR.v = NULL;
                }

                if (valuetype != RD_KAFKA_CONFVAL_STR) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value type for \"%s\": "
                                    "expecting string", confval->name);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }

                vlen = strlen(v);
                if ((confval->u.STR.minlen || confval->u.STR.maxlen) &&
                    (vlen < confval->u.STR.minlen ||
                     vlen > confval->u.STR.maxlen)) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value for \"%s\": "
                                    "expecting string with length "
                                    "%"PRIusz"..%"PRIusz,
                                    confval->name,
                                    confval->u.STR.minlen,
                                    confval->u.STR.maxlen);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }

                if (confval->u.STR.v)
                        rd_free(confval->u.STR.v);

                confval->u.STR.v = rd_strdup(v);
        }
        break;

        case RD_KAFKA_CONFVAL_PTR:
                confval->u.PTR = (void *)valuep;
                break;

        default:
                RD_NOTREACHED();
                return RD_KAFKA_RESP_ERR__NOENT;
        }

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


int rd_kafka_confval_get_int (const rd_kafka_confval_t *confval) {
        rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_INT);
        return confval->u.INT.v;
}


const char *rd_kafka_confval_get_str (const rd_kafka_confval_t *confval) {
        rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_STR);
        return confval->u.STR.v;
}

void *rd_kafka_confval_get_ptr (const rd_kafka_confval_t *confval) {
        rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_PTR);
        return confval->u.PTR;
}


#define _is_alphanum(C) (                                       \
                ((C) >= 'a' && (C) <= 'z') ||                   \
                ((C) >= 'A' && (C) <= 'Z') ||                   \
                ((C) >= '0' && (C) <= '9'))

/**
 * @returns true if the string is KIP-511 safe, else false.
 */
static rd_bool_t rd_kafka_sw_str_is_safe (const char *str) {
        const char *s;

        if (!*str)
                return rd_true;

        for (s = str ; *s ; s++) {
                int c = (int)*s;

                if (unlikely(!(_is_alphanum(c) || c == '-' || c == '.')))
                        return rd_false;
        }

        /* Verify that the string begins and ends with a-zA-Z0-9 */
        if (!_is_alphanum(*str))
                return rd_false;
        if (!_is_alphanum(*(s-1)))
                return rd_false;

        return rd_true;
}


/**
 * @brief Sanitize KIP-511 software name/version strings in-place,
 *        replacing unaccepted characters with "-".
 *
 * @warning The \p str is modified in-place.
 */
static void rd_kafka_sw_str_sanitize_inplace (char *str) {
        char *s = str, *d = str;

        /* Strip any leading non-alphanums */
        while (!_is_alphanum(*s))
                s++;

        for (; *s ; s++) {
                int c = (int)*s;

                if (unlikely(!(_is_alphanum(c) ||
                               c == '-' || c == '.')))
                        *d = '-';
                else
                        *d = *s;
                d++;
        }

        *d = '\0';

        /* Strip any trailing non-alphanums */
        for (d = d-1 ; d >= str && !_is_alphanum(*d) ; d--)
                *d = '\0';
}

#undef _is_alphanum


/**
 * @brief Verify configuration \p conf is
 *        correct/non-conflicting and finalize the configuration
 *        settings for use.
 *
 * @returns an error string if configuration is incorrect, else NULL.
 */
const char *rd_kafka_conf_finalize (rd_kafka_type_t cltype,
                                    rd_kafka_conf_t *conf) {
        const char *errstr;

        if (!conf->sw_name)
                rd_kafka_conf_set(conf, "client.software.name", "librdkafka",
                                  NULL, 0);
        if (!conf->sw_version)
                rd_kafka_conf_set(conf, "client.software.version",
                                  rd_kafka_version_str(),
                                  NULL, 0);

        /* The client.software.name and .version are sent to the broker
         * with the ApiVersionRequest starting with AK 2.4.0 (KIP-511).
         * These strings need to be sanitized or the broker will reject them,
         * so modify them in-place here. */
        rd_assert(conf->sw_name && conf->sw_version);
        rd_kafka_sw_str_sanitize_inplace(conf->sw_name);
        rd_kafka_sw_str_sanitize_inplace(conf->sw_version);

        /* Verify mandatory configuration */
        if (!conf->socket_cb)
                return "Mandatory config property `socket_cb` not set";

        if (!conf->open_cb)
                return "Mandatory config property `open_cb` not set";

#if WITH_SSL
        if (conf->ssl.keystore_location && !conf->ssl.keystore_password)
                return "`ssl.keystore.password` is mandatory when "
                        "`ssl.keystore.location` is set";
        if (conf->ssl.ca && conf->ssl.ca_location)
                return "`ssl.ca.location`, and memory-based "
                       "set_ssl_cert(CERT_CA) are mutually exclusive.";
#endif

#if WITH_SASL_OAUTHBEARER
        if (conf->sasl.enable_oauthbearer_unsecure_jwt &&
            conf->sasl.oauthbearer_token_refresh_cb)
                return "`enable.sasl.oauthbearer.unsecure.jwt` and "
                        "`oauthbearer_token_refresh_cb` are mutually exclusive";
#endif

        if (cltype == RD_KAFKA_CONSUMER) {

                /* Automatically adjust `fetch.max.bytes` to be >=
                 * `message.max.bytes` and <= `queued.max.message.kbytes`
                 * unless set by user. */
                if (rd_kafka_conf_is_modified(conf, "fetch.max.bytes")) {
                        if (conf->fetch_max_bytes < conf->max_msg_size)
                                return "`fetch.max.bytes` must be >= "
                                        "`message.max.bytes`";
                } else {
                        conf->fetch_max_bytes = RD_MAX(
                                RD_MIN(conf->fetch_max_bytes,
                                       conf->queued_max_msg_kbytes * 1024),
                                conf->max_msg_size);
                }

                /* Automatically adjust 'receive.message.max.bytes' to
                 * be 512 bytes larger than 'fetch.max.bytes' to have enough
                 * room for protocol framing (including topic name), unless
                 * set by user. */
                if (rd_kafka_conf_is_modified(conf,
                                              "receive.message.max.bytes")) {
                        if (conf->fetch_max_bytes + 512 >
                            conf->recv_max_msg_size)
                                return "`receive.message.max.bytes` must be >= "
                                        "`fetch.max.bytes` + 512";
                } else {
                        conf->recv_max_msg_size =
                                RD_MAX(conf->recv_max_msg_size,
                                       conf->fetch_max_bytes + 512);
                }

                if (conf->max_poll_interval_ms <
                    conf->group_session_timeout_ms)
                        return "`max.poll.interval.ms`must be >= "
                                "`session.timeout.ms`";

                /* Simplifies rd_kafka_is_idempotent() which is producer-only */
                conf->eos.idempotence = 0;

        } else if (cltype == RD_KAFKA_PRODUCER) {
                if (conf->eos.transactional_id) {
                        if (!conf->eos.idempotence) {
                                /* Auto enable idempotence unless
                                 * explicitly disabled */
                                if (rd_kafka_conf_is_modified(
                                            conf, "enable.idempotence"))
                                        return "`transactional.id` requires "
                                                "`enable.idempotence=true`";

                                conf->eos.idempotence = rd_true;
                        }

                        /* Make sure at least one request can be sent
                         * before the transaction times out. */
                        if (!rd_kafka_conf_is_modified(conf,
                                                       "socket.timeout.ms"))
                                conf->socket_timeout_ms =
                                        RD_MAX(conf->eos.
                                               transaction_timeout_ms - 100,
                                               900);
                        else if (conf->eos.transaction_timeout_ms + 100 <
                                 conf->socket_timeout_ms)
                                return "`socket.timeout.ms` must be set <= "
                                        "`transaction.timeout.ms` + 100";
                }

                if (conf->eos.idempotence) {
                        /* Adjust configuration values for idempotent producer*/

                        if (rd_kafka_conf_is_modified(conf, "max.in.flight")) {
                                if (conf->max_inflight >
                                    RD_KAFKA_IDEMP_MAX_INFLIGHT)
                                        return "`max.in.flight` must be "
                                                "set <= "
                                                RD_KAFKA_IDEMP_MAX_INFLIGHT_STR
                                                " when `enable.idempotence` "
                                                "is true";
                        } else {
                                conf->max_inflight =
                                        RD_MIN(conf->max_inflight,
                                               RD_KAFKA_IDEMP_MAX_INFLIGHT);
                        }


                        if (rd_kafka_conf_is_modified(conf, "retries")) {
                                if (conf->max_retries < 1)
                                        return "`retries` must be set >= 1 "
                                                "when `enable.idempotence` is "
                                                "true";
                        } else {
                                conf->max_retries = INT32_MAX;
                        }


                        if (rd_kafka_conf_is_modified(
                                    conf,
                                    "queue.buffering.backpressure.threshold")
                            && conf->queue_backpressure_thres > 1)
                                return "`queue.buffering.backpressure.threshold` "
                                        "must be set to 1 when "
                                        "`enable.idempotence` is true";
                        else
                                conf->queue_backpressure_thres = 1;

                        /* acks=all and queuing.strategy are set
                         * in topic_conf_finalize() */

                } else {
                        if (conf->eos.gapless &&
                            rd_kafka_conf_is_modified(
                                    conf, "enable.gapless.guarantee"))
                                return "`enable.gapless.guarantee` requires "
                                        "`enable.idempotence` to be enabled";
                }

                if (!rd_kafka_conf_is_modified(
                            conf, "sticky.partitioning.linger.ms"))
                        conf->sticky_partition_linger_ms = (int) RD_MIN(900000,
                                (rd_ts_t) (2 * conf->buffering_max_ms_dbl));
        }


        if (!rd_kafka_conf_is_modified(conf, "metadata.max.age.ms") &&
            conf->metadata_refresh_interval_ms > 0)
                conf->metadata_max_age_ms =
                        conf->metadata_refresh_interval_ms * 3;

        if (conf->reconnect_backoff_max_ms < conf->reconnect_backoff_ms)
                return "`reconnect.backoff.max.ms` must be >= "
                        "`reconnect.max.ms`";

        if (conf->sparse_connections) {
                /* Set sparse connection random selection interval to
                 * 10 < reconnect.backoff.ms / 2 < 1000. */
                conf->sparse_connect_intvl =
                        RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms/2, 1000));
        }

        if (!rd_kafka_conf_is_modified(conf, "allow.auto.create.topics")) {
                /* Consumer: Do not allow auto create by default.
                 * Producer: Allow auto create by default. */
                if (cltype == RD_KAFKA_CONSUMER)
                        conf->allow_auto_create_topics = rd_false;
                else if (cltype == RD_KAFKA_PRODUCER)
                        conf->allow_auto_create_topics = rd_true;
        }

        /* Finalize and verify the default.topic.config */
        if (conf->topic_conf) {

                if (cltype == RD_KAFKA_PRODUCER) {
                        rd_kafka_topic_conf_t *tconf = conf->topic_conf;

                        if (tconf->message_timeout_ms != 0 &&
                            (double)tconf->message_timeout_ms <=
                            conf->buffering_max_ms_dbl) {
                                if (rd_kafka_topic_conf_is_modified(
                                            tconf, "linger.ms"))
                                        return "`message.timeout.ms` must be "
                                                "greater than `linger.ms`";
                                else /* Auto adjust linger.ms to be lower
                                      * than message.timeout.ms */
                                        conf->buffering_max_ms_dbl =
                                                (double)tconf->
                                                message_timeout_ms - 0.1;
                        }
                }

                errstr = rd_kafka_topic_conf_finalize(cltype, conf,
                                                      conf->topic_conf);
                if (errstr)
                        return errstr;
        }

        /* Convert double linger.ms to internal int microseconds after
         * finalizing default_topic_conf since it may
         * update buffering_max_ms_dbl. */
        conf->buffering_max_us = (rd_ts_t)(conf->buffering_max_ms_dbl * 1000);


        return NULL;
}


/**
 * @brief Verify topic configuration \p tconf is
 *        correct/non-conflicting and finalize the configuration
 *        settings for use.
 *
 * @returns an error string if configuration is incorrect, else NULL.
 */
const char *rd_kafka_topic_conf_finalize (rd_kafka_type_t cltype,
                                          const rd_kafka_conf_t *conf,
                                          rd_kafka_topic_conf_t *tconf) {

        if (cltype != RD_KAFKA_PRODUCER)
                return NULL;

        if (conf->eos.idempotence) {
                /* Ensure acks=all */
                if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
                        if (tconf->required_acks != -1)
                                return "`acks` must be set to `all` when "
                                        "`enable.idempotence` is true";
                } else {
                        tconf->required_acks = -1; /* all */
                }

                /* Ensure FIFO queueing */
                if (rd_kafka_topic_conf_is_modified(tconf,
                                                    "queuing.strategy")) {
                        if (tconf->queuing_strategy != RD_KAFKA_QUEUE_FIFO)
                                return "`queuing.strategy` must be set to "
                                        "`fifo` when `enable.idempotence` is "
                                        "true";
                } else {
                        tconf->queuing_strategy = RD_KAFKA_QUEUE_FIFO;
                }

                /* Ensure message.timeout.ms <= transaction.timeout.ms */
                if (conf->eos.transactional_id) {
                        if (!rd_kafka_topic_conf_is_modified(
                                    tconf, "message.timeout.ms"))
                                tconf->message_timeout_ms =
                                        conf->eos.transaction_timeout_ms;
                        else if (tconf->message_timeout_ms >
                                 conf->eos.transaction_timeout_ms)
                                return "`message.timeout.ms` must be set <= "
                                        "`transaction.timeout.ms`";
                 }
        }

        if (tconf->message_timeout_ms != 0 &&
            (double)tconf->message_timeout_ms <= conf->buffering_max_ms_dbl &&
            rd_kafka_topic_conf_is_modified(tconf, "linger.ms"))
                return "`message.timeout.ms` must be greater than `linger.ms`";

        return NULL;
}


/**
 * @brief Log warnings for set deprecated or experimental
 *        configuration properties.
 * @returns the number of warnings logged.
 */
static int rd_kafka_anyconf_warn_deprecated (rd_kafka_t *rk,
                                             rd_kafka_conf_scope_t scope,
                                             const void *conf) {
        const struct rd_kafka_property *prop;
        int warn_type = rk->rk_type == RD_KAFKA_PRODUCER ?
                _RK_CONSUMER : _RK_PRODUCER;
        int warn_on = _RK_DEPRECATED|_RK_EXPERIMENTAL|warn_type;

        int cnt = 0;

        for (prop = rd_kafka_properties; prop->name ; prop++) {
                int match = prop->scope & warn_on;

                if (likely(!(prop->scope & scope) || !match))
                        continue;

                if (likely(!rd_kafka_anyconf_is_modified(conf, prop)))
                        continue;

                if (match != warn_type)
                        rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                                     "Configuration property %s is %s%s%s: %s",
                                     prop->name,
                                     match & _RK_DEPRECATED ? "deprecated" : "",
                                     match == warn_on ? " and " : "",
                                     match & _RK_EXPERIMENTAL ?
                                     "experimental" : "",
                                     prop->desc);

                if (match & warn_type)
                        rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                                     "Configuration property %s "
                                     "is a %s property and will be ignored by "
                                     "this %s instance",
                                     prop->name,
                                     warn_type == _RK_PRODUCER ?
                                     "producer" : "consumer",
                                     warn_type == _RK_PRODUCER ?
                                     "consumer" : "producer");

                cnt++;
        }

        return cnt;
}


/**
 * @brief Log configuration warnings (deprecated configuration properties,
 *        unrecommended combinations, etc).
 *
 * @returns the number of warnings logged.
 *
 * @locality any
 * @locks none
 */
int rd_kafka_conf_warn (rd_kafka_t *rk) {
        int cnt = 0;

        cnt = rd_kafka_anyconf_warn_deprecated(rk, _RK_GLOBAL, &rk->rk_conf);
        if (rk->rk_conf.topic_conf)
                cnt += rd_kafka_anyconf_warn_deprecated(
                        rk, _RK_TOPIC, rk->rk_conf.topic_conf);

        /* Additional warnings */
        if (rk->rk_type == RD_KAFKA_CONSUMER) {
                if (rk->rk_conf.fetch_wait_max_ms + 1000 >
                    rk->rk_conf.socket_timeout_ms)
                        rd_kafka_log(rk, LOG_WARNING,
                                     "CONFWARN",
                                     "Configuration property "
                                     "`fetch.wait.max.ms` (%d) should be "
                                     "set lower than `socket.timeout.ms` (%d) "
                                     "by at least 1000ms to avoid blocking "
                                     "and timing out sub-sequent requests",
                                     rk->rk_conf.fetch_wait_max_ms,
                                     rk->rk_conf.socket_timeout_ms);
        }

        if (rd_kafka_conf_is_modified(&rk->rk_conf, "sasl.mechanisms") &&
            !(rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
              rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT)) {
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `sasl.mechanism` set to "
                             "`%s` but `security.protocol` is not configured "
                             "for SASL: recommend setting "
                             "`security.protocol` to SASL_SSL or "
                             "SASL_PLAINTEXT",
                             rk->rk_conf.sasl.mechanisms);
        }

        if (rd_kafka_conf_is_modified(&rk->rk_conf, "sasl.username") &&
            !(!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM", 5) ||
              !strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")))
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `sasl.username` only "
                             "applies when `sasl.mechanism` is set to "
                             "PLAIN or SCRAM-SHA-..");

        if (rd_kafka_conf_is_modified(&rk->rk_conf, "client.software.name") &&
            !rd_kafka_sw_str_is_safe(rk->rk_conf.sw_name))
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `client.software.name` "
                             "may only contain 'a-zA-Z0-9.-', other characters "
                             "will be replaced with '-'");

        if (rd_kafka_conf_is_modified(&rk->rk_conf, "client.software.version") &&
            !rd_kafka_sw_str_is_safe(rk->rk_conf.sw_version))
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `client.software.verison` "
                             "may only contain 'a-zA-Z0-9.-', other characters "
                             "will be replaced with '-'");

        if (rd_atomic32_get(&rk->rk_broker_cnt) == 0)
                rd_kafka_log(rk, LOG_NOTICE, "CONFWARN",
                             "No `bootstrap.servers` configured: "
                             "client will not be able to connect "
                             "to Kafka cluster");

        return cnt;
}


const rd_kafka_conf_t *rd_kafka_conf (rd_kafka_t *rk) {
        return &rk->rk_conf;
}


/**
 * @brief Unittests
 */
int unittest_conf (void) {
        rd_kafka_conf_t *conf;
        rd_kafka_topic_conf_t *tconf;
        rd_kafka_conf_res_t res, res2;
        char errstr[128];
        int iteration;
        const struct rd_kafka_property *prop;
        char readval[512];
        size_t readlen;
        const char *errstr2;

        conf = rd_kafka_conf_new();
        tconf = rd_kafka_topic_conf_new();

        res = rd_kafka_conf_set(conf, "unknown.thing", "foo",
                                errstr, sizeof(errstr));
        RD_UT_ASSERT(res == RD_KAFKA_CONF_UNKNOWN, "fail");
        RD_UT_ASSERT(*errstr, "fail");

        for (iteration = 0 ; iteration < 5 ; iteration++) {
                int cnt;


                /* Iterations:
                 *  0 - Check is_modified
                 *  1 - Set every other config property, read back and verify.
                 *  2 - Check is_modified.
                 *  3 - Set all config properties, read back and verify.
                 *  4 - Check is_modified. */
                for (prop = rd_kafka_properties, cnt = 0 ; prop->name ;
                     prop++, cnt++) {
                        const char *val;
                        char tmp[64];
                        int odd = cnt & 1;
                        int do_set = iteration == 3 || (iteration == 1 && odd);
                        rd_bool_t is_modified;
                        int exp_is_modified = !prop->unsupported &&
                                (iteration >= 3 ||
                                 (iteration > 0 && (do_set || odd)));

                        readlen = sizeof(readval);

                        /* Avoid some special configs */
                        if (!strcmp(prop->name, "plugin.library.paths") ||
                            !strcmp(prop->name, "builtin.features"))
                                continue;

                        switch (prop->type)
                        {
                        case _RK_C_STR:
                        case _RK_C_KSTR:
                        case _RK_C_PATLIST:
                                if (prop->sdef)
                                        val = prop->sdef;
                                else
                                        val = "test";
                                break;

                        case _RK_C_BOOL:
                                val = "true";
                                break;

                        case _RK_C_INT:
                                rd_snprintf(tmp, sizeof(tmp), "%d", prop->vdef);
                                val = tmp;
                                break;

                        case _RK_C_DBL:
                                rd_snprintf(tmp, sizeof(tmp), "%g", prop->ddef);
                                val = tmp;
                                break;

                        case _RK_C_S2F:
                        case _RK_C_S2I:
                                val = prop->s2i[0].str;
                                break;

                        case _RK_C_PTR:
                        case _RK_C_ALIAS:
                        case _RK_C_INVALID:
                        case _RK_C_INTERNAL:
                        default:
                                continue;
                        }


                        if (prop->scope & _RK_GLOBAL) {
                                if (do_set)
                                        res = rd_kafka_conf_set(conf,
                                                                prop->name, val,
                                                                errstr,
                                                                sizeof(errstr));

                                res2 = rd_kafka_conf_get(conf,
                                                         prop->name,
                                                         readval, &readlen);

                                is_modified = rd_kafka_conf_is_modified(
                                        conf, prop->name);


                        } else if (prop->scope & _RK_TOPIC) {
                                if  (do_set)
                                        res = rd_kafka_topic_conf_set(
                                                tconf,
                                                prop->name, val,
                                                errstr, sizeof(errstr));

                                res2 = rd_kafka_topic_conf_get(tconf,
                                                               prop->name,
                                                               readval,
                                                               &readlen);

                                is_modified = rd_kafka_topic_conf_is_modified(
                                        tconf, prop->name);

                        } else {
                                RD_NOTREACHED();
                        }



                        if (do_set && prop->unsupported) {
                                RD_UT_ASSERT(res == RD_KAFKA_CONF_INVALID,
                                             "conf_set %s should've failed "
                                             "with CONF_INVALID, not %d: %s",
                                             prop->name, res, errstr);

                        } else if (do_set) {
                                RD_UT_ASSERT(res == RD_KAFKA_CONF_OK,
                                             "conf_set %s failed: %d: %s",
                                             prop->name, res, errstr);
                                RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK,
                                             "conf_get %s failed: %d",
                                             prop->name, res2);

                                RD_UT_ASSERT(!strcmp(readval, val),
                                             "conf_get %s "
                                             "returned \"%s\": "
                                             "expected \"%s\"",
                                             prop->name, readval, val);

                                RD_UT_ASSERT(is_modified,
                                             "Property %s was set but "
                                             "is_modified=%d",
                                             prop->name, is_modified);

                        }

                        assert(is_modified == exp_is_modified);
                        RD_UT_ASSERT(is_modified == exp_is_modified,
                                     "Property %s is_modified=%d, "
                                     "exp_is_modified=%d "
                                     "(iter %d, odd %d, do_set %d)",
                                     prop->name, is_modified,
                                     exp_is_modified,
                                     iteration, odd, do_set);
                }
        }

        /* Set an alias and make sure is_modified() works for it. */
        res = rd_kafka_conf_set(conf, "max.in.flight", "19", NULL, 0);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);

        RD_UT_ASSERT(rd_kafka_conf_is_modified(conf, "max.in.flight") == rd_true,
                     "fail");
        RD_UT_ASSERT(rd_kafka_conf_is_modified(
                             conf,
                             "max.in.flight.requests.per.connection") == rd_true,
                     "fail");

        rd_kafka_conf_destroy(conf);
        rd_kafka_topic_conf_destroy(tconf);


        /* Verify that software.client.* string-safing works */
        conf = rd_kafka_conf_new();
        res = rd_kafka_conf_set(conf, "client.software.name",
                                " .~aba. va! !.~~", NULL, 0);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);
        res = rd_kafka_conf_set(conf, "client.software.version",
                                "!1.2.3.4.5!!! a", NULL, 0);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);

        errstr2 = rd_kafka_conf_finalize(RD_KAFKA_PRODUCER, conf);
        RD_UT_ASSERT(!errstr2, "conf_finalize() failed: %s", errstr2);

        readlen = sizeof(readval);
        res2 = rd_kafka_conf_get(conf, "client.software.name",
                                 readval, &readlen);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res2);
        RD_UT_ASSERT(!strcmp(readval, "aba.-va"),
                     "client.software.* safification failed: \"%s\"", readval);
        RD_UT_SAY("Safified client.software.name=\"%s\"", readval);

        readlen = sizeof(readval);
        res2 = rd_kafka_conf_get(conf, "client.software.version",
                                 readval, &readlen);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res2);
        RD_UT_ASSERT(!strcmp(readval, "1.2.3.4.5----a"),
                     "client.software.* safification failed: \"%s\"", readval);
        RD_UT_SAY("Safified client.software.version=\"%s\"", readval);

        rd_kafka_conf_destroy(conf);

        RD_UT_PASS();
}

/**@}*/
